[ 
https://issues.apache.org/jira/browse/BEAM-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948285#comment-15948285
 ] 

Thomas Groh commented on BEAM-1835:
-----------------------------------

This is actually a bug in PubsubUnboundedSource or PubsubCheckpoint. The 
{{finalizeCheckpoint}} documentation states that
"This finalize method may be called from any thread, concurrently with calls to 
{{UnboundedReader}} it was created from."
and
"It is not safe to assume the {{UnboundedReader}} from which this checkpoint 
was created still exists at the time this method is called."

The Runner is permitted to close the reader and finalize all outstanding 
checkpoints in whatever order, potentially interleaving the two operations, so 
the checkpoint mark must not assume that the client is still available, or that 
the client, if it is available and open when the call to finalizeCheckpoint 
begins, is still available and open at any future point.

> NPE in DirectRunner PubsubReader.ackBatch
> -----------------------------------------
>
>                 Key: BEAM-1835
>                 URL: https://issues.apache.org/jira/browse/BEAM-1835
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct, sdk-java-core
>            Reporter: Rafal Wojdyla
>            Assignee: Rafal Wojdyla
>
> DirectRunner streaming mode throws null pointer exception:
> {noformat}
> org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:639)
>   at 
> org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:312)
>   at 
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:223)
>   at 
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:144)
>   at 
> org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
>   at 
> org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This does not happen always, but for large enough number of events it's 
> pretty reproducible. The problems seems to be the concurrent reuse of a 
> reader among multiple threads, and a race condition, when one of the threads 
> "decided" to close the reader, based on:
> {code}
> private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
> {code}
> the close, nulls out pubsub client:
> {code}
>     @Override
>     public void close() throws IOException {
>       if (pubsubClient != null) {
>         pubsubClient.close();
>         pubsubClient = null;
>       }
>     }
> {code}
> which if still in use by other thread will result in NPE above.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to