The reason I was checking out the code is that sometimes a natural thing to
output would be a summary of what was written. So each chunk of writes and
the final chunk written in @FinishBundle. This is, for example, what SQL
engines do (output # of rows written).

You could output both the summary and the full list of written elements to
different outputs, and users can choose. Outputs that are never consumed
should be very low or zero cost.

Kenn

On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <rober...@google.com> wrote:

> Yeah, the entire input is not always what is needed, and can generally be
> achieved via
>
>     input -> wait(side input of write) -> do something with the input
>
> Of course one could also do
>
> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
> CombineGlobally(TrivialCombineFn)
>
> to reduce this to a more minimal set with at least one element per Window.
>
> The file writing operations emit the actual files that were written, which
> can be handy. My suggestion of PCollection<?> was just so that we can emit
> something usable, and decide exactly what is the most useful is later.
>
>
> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>
>> I believe that the Wait transform turns this output into a side input, so
>> outputting the input PCollection might be problematic.
>>
>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>> just reading really quickly so sorry if I missed something...
>>>
>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>
>>>     @Setup
>>>     public void setup() {
>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>     }
>>>
>>>     @ProcessElement
>>>       public void processElement(ProcessContext c) throws
>>> ExecutionException, InterruptedException {
>>>       writer.mutate(c.element());
>>>     }
>>>
>>>     @Teardown
>>>     public void teardown() throws Exception {
>>>       writer.close();
>>>       writer = null;
>>>     }
>>>
>>> It is only in writer.close() that all async writes are waited on. This
>>> needs to happen in @FinishBundle.
>>>
>>> Did you discover this when implementing your own Cassandra.Write?
>>>
>>> Until you have waited on the future, you should not output the element
>>> as "has been written". And you cannot output from the @TearDown method
>>> which is just for cleaning up resources.
>>>
>>> Am I reading this wrong?
>>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com> wrote:
>>>
>>>> How about a PCollection containing every element which was successfully
>>>> written?
>>>> Basically the same things which were passed into it.
>>>>
>>>> Then you could act on every element after its been successfully written
>>>> to the sink.
>>>>
>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> +dev
>>>>>>
>>>>>> Since we all agree that we should return something different than
>>>>>> PDone the real question is what should we return.
>>>>>>
>>>>>
>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>> internally, of something contentless like nulls. This is future compatible
>>>>> with returning something more maningful based on the source source or 
>>>>> write
>>>>> process itself, but at least this would be followable.
>>>>>
>>>>>
>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>> already in the past but uniformization of our return values has not
>>>>>> happened.
>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>> contribute Write transforms that return.
>>>>>>
>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>
>>>>>
>>>>> Yeah, we should go ahead and finally do something.
>>>>>
>>>>>
>>>>>>
>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>> changing it now would be backwards incompatible.
>>>>>>
>>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>>> the previous signature in short time (2 releases was the average for
>>>>>> previous cases).
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>> <aromanenko....@gmail.com> wrote:
>>>>>> >
>>>>>> > I thought that was said about returning a PCollection of write
>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>>> "readAll()” methods).
>>>>>> >
>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>> >
>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>> returning variants (probably as another option to the builders) that
>>>>>> compose well with Wait, etc. would be welcome.
>>>>>> >
>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>> aromanenko....@gmail.com> wrote:
>>>>>> >>
>>>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>>>> it was mentioned before, it doesn’t work with PDone, only with 
>>>>>> PCollection
>>>>>> as a signal.
>>>>>> >>
>>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>>> great to contribute it back to Beam in the way as it was done for other 
>>>>>> IOs
>>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>>> >>
>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>> >>
>>>>>> >> [1]
>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>> >> [2]
>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>> >>
>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>> vincent.marq...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>> successfully written to a database.  So "record by record" is fine, or 
>>>>>> even
>>>>>> "bundle".
>>>>>> >>
>>>>>> >> ~Vincent
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>> aromanenko....@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>>>> then write all successfully written records to PubSub or it should be
>>>>>> performed "record by record"?
>>>>>> >>>
>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>> vincent.marq...@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>> PubSubIO.write
>>>>>> >>>
>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>> >>>
>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>> >>>                                                          |
>>>>>> >>>                                                           ->
>>>>>> PubsubIO.write
>>>>>> >>>
>>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>>> successfully finished a (quorum) write.
>>>>>> >>>
>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>> actually use it here so I often roll my own 'writer', but maybe there is 
>>>>>> a
>>>>>> recommended way of doing this?
>>>>>> >>>
>>>>>> >>> Thanks in advance for any help.
>>>>>> >>>
>>>>>> >>> ~Vincent
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>

Reply via email to