[ https://issues.apache.org/jira/browse/BEAM-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Cwik resolved BEAM-4280. ----------------------------- Resolution: Fixed Fix Version/s: 2.5.0 > DirectStreamObserver for outbound channel can block indefinitely if invoked > from inbound channel thread causing deadlock > ------------------------------------------------------------------------------------------------------------------------ > > Key: BEAM-4280 > URL: https://issues.apache.org/jira/browse/BEAM-4280 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness > Reporter: Luke Cwik > Assignee: Luke Cwik > Priority: Major > Labels: portability > Fix For: 2.5.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > gRPC docs say that: > // Note: the onReadyHandler's invocation is serialized on the same > thread pool as the incoming StreamObserver's > // onNext(), onError(), and onComplete() handlers. Blocking the > onReadyHandler will prevent additional messages > // from being processed by the incoming StreamObserver. The > onReadyHandler must return in a timely manor or else > // message processing throughput will suffer. > Looking at the stack, it i because one of the gRPC threads is blocked waiting > for the channel to become ready, preventing for that same thread to mark it > as ready: > "grpc-default-executor-0" #12 daemon prio=5 os_prio=0 tid=0x00007fcea88ee800 > nid=0x3cc8a waiting on condition [0x00007fce4b9f8000] > java.lang.Thread.State: WAITING (parking) > at (C/C++) 0x00007fcead7519f2 (Unknown Source) > at (C/C++) 0x00007fceac8b8f11 (Unknown Source) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0000000740e39c48> (a > java.util.concurrent.Phaser$QNode) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.Phaser$QNode.block(Phaser.java:1140) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067) > at java.util.concurrent.Phaser.awaitAdvance(Phaser.java:730) > at > org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:51) > at > org.apache.beam.fn.harness.data.BeamFnDataBufferingOutboundObserver.accept(BeamFnDataBufferingOutboundObserver.java:117) > at > org.apache.beam.fn.harness.data.BeamFnDataBufferingOutboundObserver.accept(BeamFnDataBufferingOutboundObserver.java:53) > at > org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:161) > at > org.apache.beam.fn.harness.BeamFnDataWriteRunner$Factory$$Lambda$41/127245540.accept(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:612) > at > com.google.cloud.dataflow.integration.synthetic.SyntheticStep.processElement(SyntheticStep.java:93) > at > com.google.cloud.dataflow.integration.synthetic.SyntheticStep$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:622) > at > org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.JavaReadViaImpulse$ReadFromBoundedSourceFn.readSoruce(JavaReadViaImpulse.java:139) > at > org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.JavaReadViaImpulse$ReadFromBoundedSourceFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:612) > at > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:129) > at > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:612) > at > org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86) > at > org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown > Source) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:80) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:135) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:123) > at > org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:51) > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:379) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessageRead.runInContext(ClientCallImpl.java:491) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52) > at > io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005)