On Thu, Mar 25, 2021 at 12:55 PM Robert Bradshaw <rober...@google.com>
wrote:

> On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez <vincent.marq...@gmail.com>
> wrote:
>
>>
>> *~Vincent*
>>
>>
>> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> The reason I was checking out the code is that sometimes a natural thing
>>> to output would be a summary of what was written. So each chunk of writes
>>> and the final chunk written in @FinishBundle. This is, for example, what
>>> SQL engines do (output # of rows written).
>>>
>>> You could output both the summary and the full list of written elements
>>> to different outputs, and users can choose. Outputs that are never consumed
>>> should be very low or zero cost.n
>>>
>>>
>> I like this approach.  I would much prefer two outputs (one of which is
>> all elements written) to returning an existential/wildcard PCollection.
>>
>
> +1, this would work well too. Returning a PCollectionTuple is extensible
> too, as one could add more (or better) outputs in the future without
> changing the signature.
>

This comment is dangerously close to sparking a philosophical conversation!

Kenn


>
>>
>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Yeah, the entire input is not always what is needed, and can generally
>>>> be achieved via
>>>>
>>>>     input -> wait(side input of write) -> do something with the input
>>>>
>>>> Of course one could also do
>>>>
>>>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>>>> CombineGlobally(TrivialCombineFn)
>>>>
>>>> to reduce this to a more minimal set with at least one element per
>>>> Window.
>>>>
>>>> The file writing operations emit the actual files that were written,
>>>> which can be handy. My suggestion of PCollection<?> was just so that we can
>>>> emit something usable, and decide exactly what is the most useful is later.
>>>>
>>>>
>>>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I believe that the Wait transform turns this output into a side input,
>>>>> so outputting the input PCollection might be problematic.
>>>>>
>>>>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>>>>> just reading really quickly so sorry if I missed something...
>>>>>>
>>>>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>>>>
>>>>>>     @Setup
>>>>>>     public void setup() {
>>>>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>>>>     }
>>>>>>
>>>>>>     @ProcessElement
>>>>>>       public void processElement(ProcessContext c) throws
>>>>>> ExecutionException, InterruptedException {
>>>>>>       writer.mutate(c.element());
>>>>>>     }
>>>>>>
>>>>>>     @Teardown
>>>>>>     public void teardown() throws Exception {
>>>>>>       writer.close();
>>>>>>       writer = null;
>>>>>>     }
>>>>>>
>>>>>> It is only in writer.close() that all async writes are waited on.
>>>>>> This needs to happen in @FinishBundle.
>>>>>>
>>>>>> Did you discover this when implementing your own Cassandra.Write?
>>>>>>
>>>>>> Until you have waited on the future, you should not output the
>>>>>> element as "has been written". And you cannot output from the @TearDown
>>>>>> method which is just for cleaning up resources.
>>>>>>
>>>>>> Am I reading this wrong?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> How about a PCollection containing every element which was
>>>>>>> successfully written?
>>>>>>> Basically the same things which were passed into it.
>>>>>>>
>>>>>>> Then you could act on every element after its been successfully
>>>>>>> written to the sink.
>>>>>>>
>>>>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> +dev
>>>>>>>>>
>>>>>>>>> Since we all agree that we should return something different than
>>>>>>>>> PDone the real question is what should we return.
>>>>>>>>>
>>>>>>>>
>>>>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>>>>> internally, of something contentless like nulls. This is future 
>>>>>>>> compatible
>>>>>>>> with returning something more maningful based on the source source or 
>>>>>>>> write
>>>>>>>> process itself, but at least this would be followable.
>>>>>>>>
>>>>>>>>
>>>>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>>>>> already in the past but uniformization of our return values has not
>>>>>>>>> happened.
>>>>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>>>>> contribute Write transforms that return.
>>>>>>>>>
>>>>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>>>>
>>>>>>>>
>>>>>>>> Yeah, we should go ahead and finally do something.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>>>> changing it now would be backwards incompatible.
>>>>>>>>>
>>>>>>>>> Periodic reminder most IOs are still Experimental so I suppose it
>>>>>>>>> is
>>>>>>>>> worth to the maintainers to judge if the upgrade to return
>>>>>>>>> someething
>>>>>>>>> different of PDone is worth, in that case we can deprecate and
>>>>>>>>> remove
>>>>>>>>> the previous signature in short time (2 releases was the average
>>>>>>>>> for
>>>>>>>>> previous cases).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>>>>> <aromanenko....@gmail.com> wrote:
>>>>>>>>> >
>>>>>>>>> > I thought that was said about returning a PCollection of write
>>>>>>>>> results as it’s done in other IOs (as I mentioned as examples) that 
>>>>>>>>> have
>>>>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>>>>> funtionality. Though, we need to follow the same pattern for user API 
>>>>>>>>> and
>>>>>>>>> maybe even naming for this feature across different IOs (like we have 
>>>>>>>>> for
>>>>>>>>> "readAll()” methods).
>>>>>>>>> >
>>>>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>>>>> >
>>>>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>>>>> returning variants (probably as another option to the builders) that
>>>>>>>>> compose well with Wait, etc. would be welcome.
>>>>>>>>> >
>>>>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> In this way, I think “Wait” PTransform should work for you but,
>>>>>>>>> as it was mentioned before, it doesn’t work with PDone, only with
>>>>>>>>> PCollection as a signal.
>>>>>>>>> >>
>>>>>>>>> >> Since you already adjusted your own writer for that, it would
>>>>>>>>> be great to contribute it back to Beam in the way as it was done for 
>>>>>>>>> other
>>>>>>>>> IOs (for example, JdbcIO [1] or BigtableIO [2])
>>>>>>>>> >>
>>>>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>>>>> >>
>>>>>>>>> >> [1]
>>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>>>>> >> [2]
>>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>>>>> >>
>>>>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>>>>> vincent.marq...@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>>>>> successfully written to a database.  So "record by record" is fine, 
>>>>>>>>> or even
>>>>>>>>> "bundle".
>>>>>>>>> >>
>>>>>>>>> >> ~Vincent
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> Do you want to wait for ALL records are written for Cassandra
>>>>>>>>> and then write all successfully written records to PubSub or it 
>>>>>>>>> should be
>>>>>>>>> performed "record by record"?
>>>>>>>>> >>>
>>>>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>>>>> vincent.marq...@gmail.com> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>>>>> PubSubIO.write
>>>>>>>>> >>>
>>>>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>>>>> >>>
>>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>>>>> >>>                                                          |
>>>>>>>>> >>>                                                           ->
>>>>>>>>> PubsubIO.write
>>>>>>>>> >>>
>>>>>>>>> >>> Because I need to ensure that only items written to Pubsub
>>>>>>>>> have successfully finished a (quorum) write.
>>>>>>>>> >>>
>>>>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>>>>> actually use it here so I often roll my own 'writer', but maybe there 
>>>>>>>>> is a
>>>>>>>>> recommended way of doing this?
>>>>>>>>> >>>
>>>>>>>>> >>> Thanks in advance for any help.
>>>>>>>>> >>>
>>>>>>>>> >>> ~Vincent
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Reply via email to