[ 
https://issues.apache.org/jira/browse/BEAM-13203?focusedWorklogId=719999&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-719999
 ]

ASF GitHub Bot logged work on BEAM-13203:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Feb/22 10:26
            Start Date: 03/Feb/22 10:26
    Worklog Time Spent: 10m 
      Work Description: mosche commented on pull request #16711:
URL: https://github.com/apache/beam/pull/16711#issuecomment-1028839651


   R: @aromanenko-dev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 719999)
    Time Spent: 20m  (was: 10m)

> Potential data loss when using SnsIO.writeAsync
> -----------------------------------------------
>
>                 Key: BEAM-13203
>                 URL: https://issues.apache.org/jira/browse/BEAM-13203
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P1
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> This needs to be investigated, reading the code suggests we might be losing 
> data under certain conditions e.g. when terminating the pipeline. The async 
> processing model here is far too simplistic.
> The bundle won't ever know about pending writes and won't block to wait for 
> any such operation. The same way exceptions are thrown into nowhere. Test 
> cases don't capture this as they operate on completed futures only (so 
> exceptions in the callbacks get thrown on the thread of processElement).
> {code:java}
> client.publish(publishRequest).whenComplete((response, ex) -> {
>   if (ex == null) {
>     SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
>     context.output(snsResponse);
>   } else {
>     LOG.error("Error while publishing request to SNS", ex);
>     throw new SnsWriteException("Error while publishing request to SNS", ex);
>   }
> }); {code}
> Also, this entirely removes backpressure from a stream. When used with a much 
> faster source we will continue to accumulate more and more memory as the 
> number of concurrent pending async operations is not limited.
> Spotify's scio contains a 
> [JavaAsyncDoFn|https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/JavaAsyncDoFn.java]
>  that illustrates how it can be done.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to