On Thu, Mar 25, 2021 at 12:55 PM Robert Bradshaw <rober...@google.com> wrote:
> On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez <vincent.marq...@gmail.com> > wrote: > >> >> *~Vincent* >> >> >> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> The reason I was checking out the code is that sometimes a natural thing >>> to output would be a summary of what was written. So each chunk of writes >>> and the final chunk written in @FinishBundle. This is, for example, what >>> SQL engines do (output # of rows written). >>> >>> You could output both the summary and the full list of written elements >>> to different outputs, and users can choose. Outputs that are never consumed >>> should be very low or zero cost.n >>> >>> >> I like this approach. I would much prefer two outputs (one of which is >> all elements written) to returning an existential/wildcard PCollection. >> > > +1, this would work well too. Returning a PCollectionTuple is extensible > too, as one could add more (or better) outputs in the future without > changing the signature. > This comment is dangerously close to sparking a philosophical conversation! Kenn > >> >> >>> Kenn >>> >>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Yeah, the entire input is not always what is needed, and can generally >>>> be achieved via >>>> >>>> input -> wait(side input of write) -> do something with the input >>>> >>>> Of course one could also do >>>> >>>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) -> >>>> CombineGlobally(TrivialCombineFn) >>>> >>>> to reduce this to a more minimal set with at least one element per >>>> Window. >>>> >>>> The file writing operations emit the actual files that were written, >>>> which can be handy. My suggestion of PCollection<?> was just so that we can >>>> emit something usable, and decide exactly what is the most useful is later. >>>> >>>> >>>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> I believe that the Wait transform turns this output into a side input, >>>>> so outputting the input PCollection might be problematic. >>>>> >>>>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> Alex's idea sounds good and like what Vincent maybe implemented. I am >>>>>> just reading really quickly so sorry if I missed something... >>>>>> >>>>>> Checking out the code for the WriteFn<T> I see a big problem: >>>>>> >>>>>> @Setup >>>>>> public void setup() { >>>>>> writer = new Mutator<>(spec, Mapper::saveAsync, "writes"); >>>>>> } >>>>>> >>>>>> @ProcessElement >>>>>> public void processElement(ProcessContext c) throws >>>>>> ExecutionException, InterruptedException { >>>>>> writer.mutate(c.element()); >>>>>> } >>>>>> >>>>>> @Teardown >>>>>> public void teardown() throws Exception { >>>>>> writer.close(); >>>>>> writer = null; >>>>>> } >>>>>> >>>>>> It is only in writer.close() that all async writes are waited on. >>>>>> This needs to happen in @FinishBundle. >>>>>> >>>>>> Did you discover this when implementing your own Cassandra.Write? >>>>>> >>>>>> Until you have waited on the future, you should not output the >>>>>> element as "has been written". And you cannot output from the @TearDown >>>>>> method which is just for cleaning up resources. >>>>>> >>>>>> Am I reading this wrong? >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com> >>>>>> wrote: >>>>>> >>>>>>> How about a PCollection containing every element which was >>>>>>> successfully written? >>>>>>> Basically the same things which were passed into it. >>>>>>> >>>>>>> Then you could act on every element after its been successfully >>>>>>> written to the sink. >>>>>>> >>>>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> +dev >>>>>>>>> >>>>>>>>> Since we all agree that we should return something different than >>>>>>>>> PDone the real question is what should we return. >>>>>>>>> >>>>>>>> >>>>>>>> My proposal is that one returns a PCollection<?> that consists, >>>>>>>> internally, of something contentless like nulls. This is future >>>>>>>> compatible >>>>>>>> with returning something more maningful based on the source source or >>>>>>>> write >>>>>>>> process itself, but at least this would be followable. >>>>>>>> >>>>>>>> >>>>>>>>> As a reminder we had a pretty interesting discussion about this >>>>>>>>> already in the past but uniformization of our return values has not >>>>>>>>> happened. >>>>>>>>> This thread is worth reading for Vincent or anyone who wants to >>>>>>>>> contribute Write transforms that return. >>>>>>>>> >>>>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E >>>>>>>> >>>>>>>> >>>>>>>> Yeah, we should go ahead and finally do something. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but >>>>>>>>> changing it now would be backwards incompatible. >>>>>>>>> >>>>>>>>> Periodic reminder most IOs are still Experimental so I suppose it >>>>>>>>> is >>>>>>>>> worth to the maintainers to judge if the upgrade to return >>>>>>>>> someething >>>>>>>>> different of PDone is worth, in that case we can deprecate and >>>>>>>>> remove >>>>>>>>> the previous signature in short time (2 releases was the average >>>>>>>>> for >>>>>>>>> previous cases). >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko >>>>>>>>> <aromanenko....@gmail.com> wrote: >>>>>>>>> > >>>>>>>>> > I thought that was said about returning a PCollection of write >>>>>>>>> results as it’s done in other IOs (as I mentioned as examples) that >>>>>>>>> have >>>>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return >>>>>>>>> PTransform<…, PCollection<WriteResults>>. >>>>>>>>> > In this case, we keep backwards compatibility and just add new >>>>>>>>> funtionality. Though, we need to follow the same pattern for user API >>>>>>>>> and >>>>>>>>> maybe even naming for this feature across different IOs (like we have >>>>>>>>> for >>>>>>>>> "readAll()” methods). >>>>>>>>> > >>>>>>>>> > I agree that we have to avoid returning PDone for such cases. >>>>>>>>> > >>>>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com> >>>>>>>>> wrote: >>>>>>>>> > >>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but >>>>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone >>>>>>>>> returning variants (probably as another option to the builders) that >>>>>>>>> compose well with Wait, etc. would be welcome. >>>>>>>>> > >>>>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko < >>>>>>>>> aromanenko....@gmail.com> wrote: >>>>>>>>> >> >>>>>>>>> >> In this way, I think “Wait” PTransform should work for you but, >>>>>>>>> as it was mentioned before, it doesn’t work with PDone, only with >>>>>>>>> PCollection as a signal. >>>>>>>>> >> >>>>>>>>> >> Since you already adjusted your own writer for that, it would >>>>>>>>> be great to contribute it back to Beam in the way as it was done for >>>>>>>>> other >>>>>>>>> IOs (for example, JdbcIO [1] or BigtableIO [2]) >>>>>>>>> >> >>>>>>>>> >> In general, I think we need to have it for all IOs, at least to >>>>>>>>> use with “Wait” because this pattern it's quite often required. >>>>>>>>> >> >>>>>>>>> >> [1] >>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078 >>>>>>>>> >> [2] >>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715 >>>>>>>>> >> >>>>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez < >>>>>>>>> vincent.marq...@gmail.com> wrote: >>>>>>>>> >> >>>>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has >>>>>>>>> successfully written to a database. So "record by record" is fine, >>>>>>>>> or even >>>>>>>>> "bundle". >>>>>>>>> >> >>>>>>>>> >> ~Vincent >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko < >>>>>>>>> aromanenko....@gmail.com> wrote: >>>>>>>>> >>> >>>>>>>>> >>> Do you want to wait for ALL records are written for Cassandra >>>>>>>>> and then write all successfully written records to PubSub or it >>>>>>>>> should be >>>>>>>>> performed "record by record"? >>>>>>>>> >>> >>>>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez < >>>>>>>>> vincent.marq...@gmail.com> wrote: >>>>>>>>> >>> >>>>>>>>> >>> I have a common use case where my pipeline looks like this: >>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> >>>>>>>>> PubSubIO.write >>>>>>>>> >>> >>>>>>>>> >>> I do NOT want my pipeline to look like the following: >>>>>>>>> >>> >>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write >>>>>>>>> >>> | >>>>>>>>> >>> -> >>>>>>>>> PubsubIO.write >>>>>>>>> >>> >>>>>>>>> >>> Because I need to ensure that only items written to Pubsub >>>>>>>>> have successfully finished a (quorum) write. >>>>>>>>> >>> >>>>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't >>>>>>>>> actually use it here so I often roll my own 'writer', but maybe there >>>>>>>>> is a >>>>>>>>> recommended way of doing this? >>>>>>>>> >>> >>>>>>>>> >>> Thanks in advance for any help. >>>>>>>>> >>> >>>>>>>>> >>> ~Vincent >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >> >>>>>>>>> > >>>>>>>>> >>>>>>>>