[ https://issues.apache.org/jira/browse/BEAM-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rafal Wojdyla updated BEAM-1835: -------------------------------- Summary: NPE in DirectRunner PubsubReader.ackBatch (was: NPE in DirectRunner ) > NPE in DirectRunner PubsubReader.ackBatch > ----------------------------------------- > > Key: BEAM-1835 > URL: https://issues.apache.org/jira/browse/BEAM-1835 > Project: Beam > Issue Type: Bug > Components: runner-direct, sdk-java-core > Reporter: Rafal Wojdyla > Assignee: Rafal Wojdyla > > DirectRunner streaming mode throws null pointer exception: > {noformat} > org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:639) > at > org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:312) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:223) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:144) > at > org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) > at > org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This does not happen always, but for large enough number of events it's > pretty reproducible. The problems seems to be the concurrent reuse of a > reader among multiple threads, and a race condition, when one of the threads > "decided" to close the reader, based on: > {code} > private static final double DEFAULT_READER_REUSE_CHANCE = 0.95; > {code} > the close, nulls out pubsub client: > {code} > @Override > public void close() throws IOException { > if (pubsubClient != null) { > pubsubClient.close(); > pubsubClient = null; > } > } > {code} > which if still in use by other thread will result in NPE above. -- This message was sent by Atlassian JIRA (v6.3.15#6346)