Moritz Mack created BEAM-13203:
----------------------------------

             Summary: Potential data loss when using SnsIO.writeAsync
                 Key: BEAM-13203
                 URL: https://issues.apache.org/jira/browse/BEAM-13203
             Project: Beam
          Issue Type: Bug
          Components: io-java-aws
            Reporter: Moritz Mack


This needs to be investigated, reading the code suggests we might be loosing 
data under certain conditions e.g. when terminating the pipeline. The async 
processing model is far too simplistic.

The bundle won't ever know about pending writes and won't block to wait for any 
such operation. The same way exceptions are thrown into nowhere. Test cases 
don't capture this as they operate on completed futures only (so exceptions in 
the callbacks get thrown on the thread of processElement).
{code:java}
client.publish(publishRequest).whenComplete((response, ex) -> {
  if (ex == null) {
    SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
    context.output(snsResponse);
  } else {
    LOG.error("Error while publishing request to SNS", ex);
    throw new SnsWriteException("Error while publishing request to SNS", ex);
  }
}); {code}
Also, this entirely removes backpressure from a stream. When used with a much 
faster source we will continue to accumulate more and more memory as the number 
of concurrent pending async operations is not limited.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to