IIRC, the default artifact directory is local, not HDFS, which would of course not be readable on the workers.
Good point about missing hdfs parameters on the job server. Looks like by default, it gets these from the environment? https://github.com/apache/beam/blob/release-2.12.0/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L48 I'm actually not that familiar with HDFS, so maybe someone else can chime in here. (But we should be throwing a better error than NullPointer.) On Tue, May 28, 2019 at 10:19 AM 青雉(祁明良) <[email protected]> wrote: > > Yes, it is built from release-2.12.0 branch. There was an NPE message at > BeamFileSystemArtifactStagingService.java:239, but it shows only at the first > submission. > > Plus, I wonder why there was --hdfs host / port / user argument for the > python submission script, but not for the job server. If I let the > artifact-dir be default, the following submission script will work fine (only > at load data phase, the next phase will fail because of unfound artifact > directory), which means hdfs can be accessed. > > Submit script > ----- > python word_count.py --input hdfs://algo-emr/k8s_flink/LICENSE.txt --output > out --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type > PROCESS --environment_config "{\"command\":\"/opt/apache/beam/boot\"}" > --hdfs_host 10.53.48.6 --hdfs_port 4008 --hdfs_user data > > > Error Log: > ----------------------- > ./lib/beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server > --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir > hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/ > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - ArtifactStagingService started on localhost:8098 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - Java ExpansionService started on localhost:8097 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - JobService started on localhost:8099 > May 28, 2019 8:08:10 AM > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run > SEVERE: Exception while executing runnable > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable@18e94dfb > java.lang.NullPointerException > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onNext(BeamFileSystemArtifactStagingService.java:239) > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onNext(BeamFileSystemArtifactStagingService.java:196) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > May 28, 2019 8:08:10 AM > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run > SEVERE: Exception while executing runnable > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5e1d1a8b > java.lang.NullPointerException > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onCompleted(BeamFileSystemArtifactStagingService.java:293) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:259) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > On 28 May 2019, at 3:55 PM, Robert Bradshaw <[email protected]> wrote: > > Thanks for the report. Is this with 2.12.0? If so, > https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L293 > seems a strange place to get a NullPointerException. Is there perhaps > an exception earlier in the code (which could be the root cause)? > > On Tue, May 28, 2019 at 4:52 AM 青雉(祁明良) <[email protected]> wrote: > > > Hi Robert, > > When I set the —artifacts-dir to hdfs location, I got a NPE exception. The > url is accessible via hadoop client. > > --------------- > ./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server > --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir > hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/ > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - ArtifactStagingService started on localhost:8098 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - Java ExpansionService started on localhost:8097 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - JobService started on localhost:8099 > May 28, 2019 2:43:56 AM > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run > SEVERE: Exception while executing runnable > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@44065193 > java.lang.NullPointerException > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onCompleted(BeamFileSystemArtifactStagingService.java:293) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:259) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > On 27 May 2019, at 9:49 PM, Robert Bradshaw <[email protected]> wrote: > > On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) <[email protected]> wrote: > > > Just now I try to use the PROCESS environment type, the Flink taskmanager > complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found > this directory is only created on the machine with beam job endpoint. I guess > maybe I should set the artifact-dir to a hdfs location, but no luck for me:( > > > Yes, you need to set your artifact staging directory (the > --artifacts-dir flag) to something visible to both the job server and > the workers. Did you try that? > > I don’t know if the following error message from job endpoint is related when > submitting the job. > > Error from job endpoint: > --------------- > [grpc-default-executor-0] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - > Encountered Unexpected Exception for Invocation > job_09aa2abd-0bc0-4994-a8b7-130156e4c13c > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534) > at > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341) > at > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262) > at > org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > On 27 May 2019, at 6:53 PM, Maximilian Michels <[email protected]> wrote: > > Hi Mingliang, > > The environment is created for each TaskManager. > > For docker, will it create one docker per flink taskmanager? > > > Yes. > > For process, does it mean start a python process to run the user code? And > it seems "command" should be set in the environment config, but what should > it be? > > > You will have to start the same Python SDK Harness which would run inside a > Docker container if you had chosen Docker. This is a more manual approach > which should only be chosen if you cannot use Docker. > > For external(loopback), does it mean let flink operator to call an external > service and by default set to the place where I submit the beam job? This > looks like all the data will be shift to a single machine and processed there. > > > This intended for a long-running SDK Harness which is already running when > you run your pipeline. Thus, you only provide the address to the already > running SDK Harness. > > Cheers, > Max > > On 26.05.19 13:51, 青雉(祁明良) wrote: > > Hi All, > I'm currently trying python portable runner with Flink. I see there are 3 > kinds of environment_type available "docker/process/external(loopback)" when > submit a job. But I didn't find any material explain more. > 1. For docker, will it create one docker per flink taskmanager? > 2. For process, does it mean start a python process to run the user > code? And it seems "command" should be set in the environment > config, but what should it be? > 3. For external(loopback), does it mean let flink operator to call an > external service and by default set to the place where I submit the > beam job? This looks like all the data will be shift to a single > machine and processed there. > Thanks, > Mingliang > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发) > 本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you.
