On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org> wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am just
> reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn<T> I see a big problem:
>
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
>
>     @ProcessElement
>       public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>       writer.mutate(c.element());
>     }
>
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element as
> "has been written". And you cannot output from the @TearDown method which
> is just for cleaning up resources.
>
>
I didn't use an async call, I did a blocking write.  I actually think using
Futures/async to write here is an anti-pattern, as it prevents the ability
to linearize your writes, which is often necessary when doing high
throughput with many millions of updates so you don't overload a specific
shard/core, but eager to hear more if my reasoning isn't correct.




> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com> wrote:
>
>> How about a PCollection containing every element which was successfully
>> written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully written
>> to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>
>>>> +dev
>>>>
>>>> Since we all agree that we should return something different than
>>>> PDone the real question is what should we return.
>>>>
>>>
>>> My proposal is that one returns a PCollection<?> that consists,
>>> internally, of something contentless like nulls. This is future compatible
>>> with returning something more maningful based on the source source or write
>>> process itself, but at least this would be followable.
>>>
>>>
>>>> As a reminder we had a pretty interesting discussion about this
>>>> already in the past but uniformization of our return values has not
>>>> happened.
>>>> This thread is worth reading for Vincent or anyone who wants to
>>>> contribute Write transforms that return.
>>>>
>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>
>>>>
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible.
>>>>
>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>> worth to the maintainers to judge if the upgrade to return someething
>>>> different of PDone is worth, in that case we can deprecate and remove
>>>> the previous signature in short time (2 releases was the average for
>>>> previous cases).
>>>>
>>>>
>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>> <aromanenko....@gmail.com> wrote:
>>>> >
>>>> > I thought that was said about returning a PCollection of write
>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>> PTransform<…, PCollection<WriteResults>>.
>>>> > In this case, we keep backwards compatibility and just add new
>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>> maybe even naming for this feature across different IOs (like we have for
>>>> "readAll()” methods).
>>>> >
>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>> >
>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>> >
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>> returning variants (probably as another option to the builders) that
>>>> compose well with Wait, etc. would be welcome.
>>>> >
>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>> aromanenko....@gmail.com> wrote:
>>>> >>
>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>> as a signal.
>>>> >>
>>>> >> Since you already adjusted your own writer for that, it would be
>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>> >>
>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>> with “Wait” because this pattern it's quite often required.
>>>> >>
>>>> >> [1]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>> >> [2]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>> >>
>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marq...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>> successfully written to a database.  So "record by record" is fine, or even
>>>> "bundle".
>>>> >>
>>>> >> ~Vincent
>>>> >>
>>>> >>
>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>> aromanenko....@gmail.com> wrote:
>>>> >>>
>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>> then write all successfully written records to PubSub or it should be
>>>> performed "record by record"?
>>>> >>>
>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>> vincent.marq...@gmail.com> wrote:
>>>> >>>
>>>> >>> I have a common use case where my pipeline looks like this:
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>> PubSubIO.write
>>>> >>>
>>>> >>> I do NOT want my pipeline to look like the following:
>>>> >>>
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>> >>>                                                          |
>>>> >>>                                                           ->
>>>> PubsubIO.write
>>>> >>>
>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>> successfully finished a (quorum) write.
>>>> >>>
>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>> recommended way of doing this?
>>>> >>>
>>>> >>> Thanks in advance for any help.
>>>> >>>
>>>> >>> ~Vincent
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>

Reply via email to