Hi Buvana, The usual cause of errors like this is a mismatch between the Python SDK and the Spark job server. Since you are building the job server from source, I would make sure you have checked out the same version as the Python SDK you are using.
Hope that helps. Kyle On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) < [email protected]> wrote: > Hello, > > > > I am trying to test the Beam Python pipeline on SparkRunner with Spark on > Mesos. Followed the instructions here: > > https://beam.apache.org/documentation/runners/spark/ > > > > The portable job runner is up and running and is pointing to a valid > Spark Master URL (Spark on Mesos). > > > > However, a simple Beam Python Pipeline (works totally fine on Local > Runner) submitted to this job runner fails with error code displayed below > my sign. It appears that the job runner finds issues with the pipeline > syntax – may be its looking for jvm pipelines and got a Python pipeline? > > > > I would appreciate any pointers that you can provide. > > > > Thank you, > > Regards, > > Buvana > > > > Client Side: > > ========= > > $ python test-beam.py > > > > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.6 interpreter. > > Traceback (most recent call last): > > File "test-beam.py", line 117, in <module> > > beam.io.WriteToText(output_filename) > > File > "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", > line 481, in __exit__ > > self.run().wait_until_finish() > > File > "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", > line 461, in run > > self._options).run(False) > > File > "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py", > line 474, in run > > return self.runner.run_pipeline(self, self._options) > > File > "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", > line 317, in run_pipeline > > retrieval_token=retrieval_token)) > > File > "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line > 826, in __call__ > > return _end_unary_response_blocking(state, call, False, None) > > File > "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line > 729, in _end_unary_response_blocking > > raise _InactiveRpcError(state) > > grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated > with: > > status = StatusCode.INVALID_ARGUMENT > > details = "" > > debug_error_string = > "{"created":"@1586818954.316495237","description":"Error received from peer > ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}" > > ----------------------------------------------------------------------- > > job Runner: > > ========= > > tfs@datamon4:/nas2/tfs/beam$ ./gradlew > :runners:spark:job-server:runShadow > -PsparkMasterUrl=spark://$SPARK_MASTER:7077 > > Starting a Gradle Daemon (subsequent builds will be faster) > > Configuration on demand is an incubating feature. > > > > > Task :runners:spark:job-server:runShadow > > Listening for transport dt_socket at address: 5005 > > 20/04/13 19:02:04 INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: > LegacyArtifactStagingService started on localhost:8098 > > 20/04/13 19:02:04 INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java > ExpansionService started on localhost:8097 > > 20/04/13 19:02:04 INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: > JobService started on localhost:8099 > > 20/04/13 19:02:34 WARN > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService: > Encountered Unexpected Exception during validation > > java.lang.RuntimeException: Failed to validate transform > ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn > )_6 > > at > org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215) > > at > org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123) > > at > org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103) > > at > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223) > > at > org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.IllegalArgumentException > > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127) > > at > org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238) > > at > org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213) > > ... 16 more > > > > > > *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" < > [email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Monday, April 13, 2020 at 6:55 PM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Kyle, > > > > Thanks a lot for the pointers. I got interested to run my beam pipeline on > FlinkRunner and got a local Flink cluster setup, tested a sample code to > work fine. > > > > I started the Beam job runner going: > > docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master > $IP:8081 --job-host $IP --job-port 8099 > > > > Submitted a beam pipeline, which when run with LocalRunner works totally > fine. The last stage of the pipeline code looks as follows: > > . . . > > . . . > > . . . > > output= ( > > { > > 'Mean Open': mean_open, > > 'Mean Close': mean_close > > } | > > beam.CoGroupByKey() | > > beam.io.WriteToText(args.output) > > ) > > > > So, we are ending the pipeline with a io.WriteToText() > > > > Now, when I supply a filename, whether residing in local disk (/tmp) or > network mounted disk(e.g /nas2), I get the following error: > > python test-beam.py –input data/sp500.csv –output /tmp/result.txt > > > > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.6 interpreter. > > ERROR:root:java.lang.RuntimeException: Error received from SDK harness for > instruction 2: Traceback (most recent call last): > > File "apache_beam/runners/common.py", line 883, in > apache_beam.runners.common.DoFnRunner.process > > File "apache_beam/runners/common.py", line 667, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > > File "apache_beam/runners/common.py", line 748, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", > line 1095, in _finalize_write > > writer = sink.open_writer(init_result, str(uuid.uuid4())) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 191, in open_writer > > return FileBasedSinkWriter(self, writer_path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 395, in __init__ > > self.temp_handle = self.sink.open(temp_shard_path) > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", > line 397, in open > > file_handle = super(_TextSink, self).open(temp_path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 134, in open > > return FileSystems.create(temp_path, self.mime_type, > self.compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 217, in create > > return filesystem.create(path, mime_type, compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", > line 155, in create > > return self._path_open(path, 'wb', mime_type, compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", > line 137, in _path_open > > raw_file = open(path, mode) > > FileNotFoundError: [Errno 2] No such file or directory: > '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt' > > > > > > It appears that the filesystem in the client side is not the same as the > environment that Flink creates to run the Beam pipeline (I think Flink does > a docker run of the python sdk to run the Beam pipeline? In that case, how > would the container know where to write the file?) > > > > Please help me debug. The Flink monitoring dashboard shows the several > stages of the job, Map, Reduce and what not… In the end, the status is > FAILED. > > > > -Buvana > > > > *From: *Kyle Weaver <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Monday, April 13, 2020 at 11:57 AM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Hi Buvana, > > > > Running Beam Python on Spark on Kubernetes is more complicated, because > Beam has its own solution for running Python code [1]. Unfortunately > there's no guide that I know of for Spark yet, however we do have > instructions for Flink [2]. Beam's Flink and Spark runners, and I assume > GCP's (unofficial) Flink and Spark [3] operators, are probably similar > enough that it shouldn't be too hard to port the YAML from the Flink > operator to the Spark operator. I filed an issue for it [4], but I probably > won't have the bandwidth to work on it myself for a while. > > > > - Kyle > > > > [1] https://beam.apache.org/roadmap/portability/ > > [2] > https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md > > [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/ > > [4] > https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870 > > > > On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Thank you, Rahul for your very useful response. Can you please extend your > response by commenting on the procedure for Beam python pipeline? > > > > *From: *rahul patwari <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Friday, April 10, 2020 at 10:57 PM > *To: *user <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Hi Buvana, > > > > You can submit a Beam Pipeline to Spark on k8s like any other Spark > Pipeline using the spark-submit script. > > > > Create an Uber Jar of your Beam code and provide it as the primary > resource to spark-submit. Provide the k8s master and the container image to > use as arguments to spark-submit. > > Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to > know more about how to run Spark on k8s. > > > > The Beam pipeline will be translated to a Spark Pipeline using Spark APIs > in Runtime. > > > > Regards, > > Rahul > > > > On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Hello, > > > > I newly joined this group and I went through the archive to see if any > discussion exists on submitting Beam pipelines to a SparkRunner on k8s. > > > > I run my Spark jobs on a k8s cluster in the cluster mode. Would like to > deploy my beam pipeline on a SparkRunner with k8s underneath. > > > > The Beam documentation: > > https://beam.apache.org/documentation/runners/spark/ > > does not discuss about k8s (though there is mention of Mesos and YARN). > > > > Can someone please point me to relevant material in this regard? Or, > provide the steps for running my beam pipeline in this configuration? > > > > Thank you, > > Regards, > > Buvana > >
