*~Vincent*
On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles <k...@apache.org> wrote: > The reason I was checking out the code is that sometimes a natural thing > to output would be a summary of what was written. So each chunk of writes > and the final chunk written in @FinishBundle. This is, for example, what > SQL engines do (output # of rows written). > > You could output both the summary and the full list of written elements to > different outputs, and users can choose. Outputs that are never consumed > should be very low or zero cost.n > > I like this approach. I would much prefer two outputs (one of which is all elements written) to returning an existential/wildcard PCollection. > Kenn > > On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Yeah, the entire input is not always what is needed, and can generally be >> achieved via >> >> input -> wait(side input of write) -> do something with the input >> >> Of course one could also do >> >> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) -> >> CombineGlobally(TrivialCombineFn) >> >> to reduce this to a more minimal set with at least one element per >> Window. >> >> The file writing operations emit the actual files that were written, >> which can be handy. My suggestion of PCollection<?> was just so that we can >> emit something usable, and decide exactly what is the most useful is later. >> >> >> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote: >> >>> I believe that the Wait transform turns this output into a side input, >>> so outputting the input PCollection might be problematic. >>> >>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Alex's idea sounds good and like what Vincent maybe implemented. I am >>>> just reading really quickly so sorry if I missed something... >>>> >>>> Checking out the code for the WriteFn<T> I see a big problem: >>>> >>>> @Setup >>>> public void setup() { >>>> writer = new Mutator<>(spec, Mapper::saveAsync, "writes"); >>>> } >>>> >>>> @ProcessElement >>>> public void processElement(ProcessContext c) throws >>>> ExecutionException, InterruptedException { >>>> writer.mutate(c.element()); >>>> } >>>> >>>> @Teardown >>>> public void teardown() throws Exception { >>>> writer.close(); >>>> writer = null; >>>> } >>>> >>>> It is only in writer.close() that all async writes are waited on. This >>>> needs to happen in @FinishBundle. >>>> >>>> Did you discover this when implementing your own Cassandra.Write? >>>> >>>> Until you have waited on the future, you should not output the element >>>> as "has been written". And you cannot output from the @TearDown method >>>> which is just for cleaning up resources. >>>> >>>> Am I reading this wrong? >>>> >>>> Kenn >>>> >>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com> wrote: >>>> >>>>> How about a PCollection containing every element which was >>>>> successfully written? >>>>> Basically the same things which were passed into it. >>>>> >>>>> Then you could act on every element after its been successfully >>>>> written to the sink. >>>>> >>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> +dev >>>>>>> >>>>>>> Since we all agree that we should return something different than >>>>>>> PDone the real question is what should we return. >>>>>>> >>>>>> >>>>>> My proposal is that one returns a PCollection<?> that consists, >>>>>> internally, of something contentless like nulls. This is future >>>>>> compatible >>>>>> with returning something more maningful based on the source source or >>>>>> write >>>>>> process itself, but at least this would be followable. >>>>>> >>>>>> >>>>>>> As a reminder we had a pretty interesting discussion about this >>>>>>> already in the past but uniformization of our return values has not >>>>>>> happened. >>>>>>> This thread is worth reading for Vincent or anyone who wants to >>>>>>> contribute Write transforms that return. >>>>>>> >>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E >>>>>> >>>>>> >>>>>> Yeah, we should go ahead and finally do something. >>>>>> >>>>>> >>>>>>> >>>>>>> > Returning PDone is an anti-pattern that should be avoided, but >>>>>>> changing it now would be backwards incompatible. >>>>>>> >>>>>>> Periodic reminder most IOs are still Experimental so I suppose it is >>>>>>> worth to the maintainers to judge if the upgrade to return someething >>>>>>> different of PDone is worth, in that case we can deprecate and remove >>>>>>> the previous signature in short time (2 releases was the average for >>>>>>> previous cases). >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko >>>>>>> <aromanenko....@gmail.com> wrote: >>>>>>> > >>>>>>> > I thought that was said about returning a PCollection of write >>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have >>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return >>>>>>> PTransform<…, PCollection<WriteResults>>. >>>>>>> > In this case, we keep backwards compatibility and just add new >>>>>>> funtionality. Though, we need to follow the same pattern for user API >>>>>>> and >>>>>>> maybe even naming for this feature across different IOs (like we have >>>>>>> for >>>>>>> "readAll()” methods). >>>>>>> > >>>>>>> > I agree that we have to avoid returning PDone for such cases. >>>>>>> > >>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> > >>>>>>> > Returning PDone is an anti-pattern that should be avoided, but >>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone >>>>>>> returning variants (probably as another option to the builders) that >>>>>>> compose well with Wait, etc. would be welcome. >>>>>>> > >>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko < >>>>>>> aromanenko....@gmail.com> wrote: >>>>>>> >> >>>>>>> >> In this way, I think “Wait” PTransform should work for you but, >>>>>>> as it was mentioned before, it doesn’t work with PDone, only with >>>>>>> PCollection as a signal. >>>>>>> >> >>>>>>> >> Since you already adjusted your own writer for that, it would be >>>>>>> great to contribute it back to Beam in the way as it was done for other >>>>>>> IOs >>>>>>> (for example, JdbcIO [1] or BigtableIO [2]) >>>>>>> >> >>>>>>> >> In general, I think we need to have it for all IOs, at least to >>>>>>> use with “Wait” because this pattern it's quite often required. >>>>>>> >> >>>>>>> >> [1] >>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078 >>>>>>> >> [2] >>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715 >>>>>>> >> >>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez < >>>>>>> vincent.marq...@gmail.com> wrote: >>>>>>> >> >>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has >>>>>>> successfully written to a database. So "record by record" is fine, or >>>>>>> even >>>>>>> "bundle". >>>>>>> >> >>>>>>> >> ~Vincent >>>>>>> >> >>>>>>> >> >>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko < >>>>>>> aromanenko....@gmail.com> wrote: >>>>>>> >>> >>>>>>> >>> Do you want to wait for ALL records are written for Cassandra >>>>>>> and then write all successfully written records to PubSub or it should >>>>>>> be >>>>>>> performed "record by record"? >>>>>>> >>> >>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez < >>>>>>> vincent.marq...@gmail.com> wrote: >>>>>>> >>> >>>>>>> >>> I have a common use case where my pipeline looks like this: >>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> >>>>>>> PubSubIO.write >>>>>>> >>> >>>>>>> >>> I do NOT want my pipeline to look like the following: >>>>>>> >>> >>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write >>>>>>> >>> | >>>>>>> >>> -> >>>>>>> PubsubIO.write >>>>>>> >>> >>>>>>> >>> Because I need to ensure that only items written to Pubsub have >>>>>>> successfully finished a (quorum) write. >>>>>>> >>> >>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't >>>>>>> actually use it here so I often roll my own 'writer', but maybe there >>>>>>> is a >>>>>>> recommended way of doing this? >>>>>>> >>> >>>>>>> >>> Thanks in advance for any help. >>>>>>> >>> >>>>>>> >>> ~Vincent >>>>>>> >>> >>>>>>> >>> >>>>>>> >> >>>>>>> > >>>>>>> >>>>>>