[ https://issues.apache.org/jira/browse/BEAM-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506482#comment-16506482 ]
Eugene Kirpichov commented on BEAM-4473: ---------------------------------------- I think the bug is that at [https://github.com/apache/beam/blob/69fb5c97a91ec9be3f481e827300cf796cbbfa19/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java#L83] we don't wrap the outbound observer into any sort of synchronization, unlike at [https://github.com/apache/beam/blob/69fb5c97a91ec9be3f481e827300cf796cbbfa19/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java#L130,] where we end up wrapping it into [https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/StreamObserverFactory.java] . > Flaky > org.apache.beam.runners.direct.portable.ReferenceRunnerTest.pipelineExecution > ----------------------------------------------------------------------------------- > > Key: BEAM-4473 > URL: https://issues.apache.org/jira/browse/BEAM-4473 > Project: Beam > Issue Type: Bug > Components: runner-direct > Reporter: Luke Cwik > Assignee: Luke Cwik > Priority: Major > > Example run: > [https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6062/testReport/junit/org.apache.beam.runners.direct.portable/ReferenceRunnerTest/pipelineExecution/] > > {code:java} > Error Message > java.lang.IllegalStateException: sendHeaders has already been called > Stacktrace > java.lang.IllegalStateException: sendHeaders has already been called > at > org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Preconditions.checkState(Preconditions.java:444) > at io.grpc.internal.ServerCallImpl.sendHeaders(ServerCallImpl.java:104) > at > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:282) > at > org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:112) > at > org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:224) > at > org.apache.beam.runners.direct.portable.RemoteStageEvaluatorFactory$RemoteStageEvaluator.finishBundle(RemoteStageEvaluatorFactory.java:85) > at > org.apache.beam.runners.direct.portable.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:154) > at > org.apache.beam.runners.direct.portable.DirectTransformExecutor.run(DirectTransformExecutor.java:103) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Suppressed: java.lang.IllegalStateException: Processing bundle failed, > TODO: [BEAM-3962] abort bundle. > at > org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:234) > ... 8 more > Standard Output > Shutting SDK harness down. > Standard Error > Jun 04, 2018 9:34:41 PM org.apache.beam.sdk.coders.SerializableCoder > checkEqualsMethodDefined > WARNING: Can't verify serialized elements of type BoundedSource have well > defined equals method. This may produce incorrect results on some > PipelineRunner > Jun 04, 2018 9:34:41 PM org.apache.beam.sdk.coders.SerializableCoder > checkEqualsMethodDefined > WARNING: Can't verify serialized elements of type BoundedSource have well > defined equals method. This may produce incorrect results on some > PipelineRunner > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.control.FnApiControlClient > closeAndTerminateOutstandingRequests > SEVERE: FnApiControlClient closed, clearing outstanding requests > {5=java.util.concurrent.CompletableFuture@1051ec6e[Not completed, 1 > dependents], 6=java.util.concurrent.CompletableFuture@341889cc[Not completed, > 1 dependents]} > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log > INFO: Beam Fn Control client connected with id > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log > INFO: Fn Harness started > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log > INFO: Beam Fn Logging client connected. > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log > INFO: Entering instruction processing loop > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log > INFO: Beam Fn Data client connected. > Jun 04, 2018 9:34:45 PM > org.apache.beam.runners.fnexecution.logging.GrpcLoggingService$InboundObserver > onCompleted > INFO: Logging client hanged up. > java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: > CANCELLED: Runner closed connection > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.processInstructionRequests(BeamFnControlClient.java:158) > at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:157) > at > org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory.lambda$createEnvironment$0(InProcessEnvironmentFactory.java:90) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Jun 04, 2018 9:34:45 PM > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver onError > SEVERE: Failed to handle for url: "InProcessServer_4" > io.grpc.StatusRuntimeException: CANCELLED: Multiplexer hanging up > at io.grpc.Status.asRuntimeException(Status.java:540) > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392) > at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428) > at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52) > at > io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Jun 04, 2018 9:34:45 PM > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver > onCompleted > WARNING: Hanged up for unknown endpoint. > Caused by: io.grpc.StatusRuntimeException: CANCELLED: Runner closed connection > at io.grpc.Status.asRuntimeException(Status.java:540) > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392) > at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428) > at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52) > at > io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152) > ... 3 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)