Thanks for the report. Is this with 2.12.0? If so, https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L293 seems a strange place to get a NullPointerException. Is there perhaps an exception earlier in the code (which could be the root cause)?
On Tue, May 28, 2019 at 4:52 AM 青雉(祁明良) <[email protected]> wrote: > > Hi Robert, > > When I set the —artifacts-dir to hdfs location, I got a NPE exception. The > url is accessible via hadoop client. > > --------------- > ./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server > --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir > hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/ > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - ArtifactStagingService started on localhost:8098 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - Java ExpansionService started on localhost:8097 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - JobService started on localhost:8099 > May 28, 2019 2:43:56 AM > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run > SEVERE: Exception while executing runnable > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@44065193 > java.lang.NullPointerException > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onCompleted(BeamFileSystemArtifactStagingService.java:293) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:259) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > On 27 May 2019, at 9:49 PM, Robert Bradshaw <[email protected]> wrote: > > On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) <[email protected]> wrote: > > > Just now I try to use the PROCESS environment type, the Flink taskmanager > complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found > this directory is only created on the machine with beam job endpoint. I guess > maybe I should set the artifact-dir to a hdfs location, but no luck for me:( > > > Yes, you need to set your artifact staging directory (the > --artifacts-dir flag) to something visible to both the job server and > the workers. Did you try that? > > I don’t know if the following error message from job endpoint is related when > submitting the job. > > Error from job endpoint: > --------------- > [grpc-default-executor-0] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - > Encountered Unexpected Exception for Invocation > job_09aa2abd-0bc0-4994-a8b7-130156e4c13c > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534) > at > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341) > at > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262) > at > org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > On 27 May 2019, at 6:53 PM, Maximilian Michels <[email protected]> wrote: > > Hi Mingliang, > > The environment is created for each TaskManager. > > For docker, will it create one docker per flink taskmanager? > > > Yes. > > For process, does it mean start a python process to run the user code? And > it seems "command" should be set in the environment config, but what should > it be? > > > You will have to start the same Python SDK Harness which would run inside a > Docker container if you had chosen Docker. This is a more manual approach > which should only be chosen if you cannot use Docker. > > For external(loopback), does it mean let flink operator to call an external > service and by default set to the place where I submit the beam job? This > looks like all the data will be shift to a single machine and processed there. > > > This intended for a long-running SDK Harness which is already running when > you run your pipeline. Thus, you only provide the address to the already > running SDK Harness. > > Cheers, > Max > > On 26.05.19 13:51, 青雉(祁明良) wrote: > > Hi All, > I'm currently trying python portable runner with Flink. I see there are 3 > kinds of environment_type available "docker/process/external(loopback)" when > submit a job. But I didn't find any material explain more. > 1. For docker, will it create one docker per flink taskmanager? > 2. For process, does it mean start a python process to run the user > code? And it seems "command" should be set in the environment > config, but what should it be? > 3. For external(loopback), does it mean let flink operator to call an > external service and by default set to the place where I submit the > beam job? This looks like all the data will be shift to a single > machine and processed there. > Thanks, > Mingliang > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发) > 本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you.
