Hi Buvana,

The usual cause of errors like this is a mismatch between the Python SDK
and the Spark job server. Since you are building the job server from
source, I would make sure you have checked out the same version as the
Python SDK you are using.

Hope that helps.
Kyle

On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
[email protected]> wrote:

> Hello,
>
>
>
> I am trying to test the Beam Python pipeline on SparkRunner with Spark on
> Mesos. Followed the instructions here:
>
> https://beam.apache.org/documentation/runners/spark/
>
>
>
> The portable job  runner is up and running and is pointing to a valid
> Spark Master URL (Spark on Mesos).
>
>
>
> However, a simple Beam Python Pipeline (works totally fine on Local
> Runner) submitted to this job runner fails with error code displayed below
> my sign. It appears that the job runner finds issues with the pipeline
> syntax – may be its looking for jvm pipelines and got a Python pipeline?
>
>
>
> I would appreciate any pointers that you can provide.
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>
>
> Client Side:
>
> =========
>
> $ python test-beam.py
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> Traceback (most recent call last):
>
>   File "test-beam.py", line 117, in <module>
>
>     beam.io.WriteToText(output_filename)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 481, in __exit__
>
>     self.run().wait_until_finish()
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 461, in run
>
>     self._options).run(False)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 474, in run
>
>     return self.runner.run_pipeline(self, self._options)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 317, in run_pipeline
>
>     retrieval_token=retrieval_token))
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line
> 826, in __call__
>
>     return _end_unary_response_blocking(state, call, False, None)
>
>   File
> "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py", line
> 729, in _end_unary_response_blocking
>
>     raise _InactiveRpcError(state)
>
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
> with:
>
>                 status = StatusCode.INVALID_ARGUMENT
>
>                 details = ""
>
>                 debug_error_string =
> "{"created":"@1586818954.316495237","description":"Error received from peer
> ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
>
>  -----------------------------------------------------------------------
>
> job Runner:
>
>  =========
>
> tfs@datamon4:/nas2/tfs/beam$ ./gradlew
> :runners:spark:job-server:runShadow
> -PsparkMasterUrl=spark://$SPARK_MASTER:7077
>
> Starting a Gradle Daemon (subsequent builds will be faster)
>
> Configuration on demand is an incubating feature.
>
>
>
> > Task :runners:spark:job-server:runShadow
>
> Listening for transport dt_socket at address: 5005
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
> LegacyArtifactStagingService started on localhost:8098
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java
> ExpansionService started on localhost:8097
>
> 20/04/13 19:02:04 INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
> JobService started on localhost:8099
>
> 20/04/13 19:02:34 WARN
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService:
> Encountered Unexpected Exception during validation
>
> java.lang.RuntimeException: Failed to validate transform
> ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn
> )_6
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
>
>         at
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
>
>         at
> org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalArgumentException
>
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
>
>         at
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
>
>         ... 16 more
>
>
>
>
>
> *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" <
> [email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Monday, April 13, 2020 at 6:55 PM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
>     output= (
>
>         {
>
>             'Mean Open': mean_open,
>
>             'Mean Close': mean_close
>
>         } |
>
>         beam.CoGroupByKey() |
>
>         beam.io.WriteToText(args.output)
>
>     )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in _finalize_write
>
>     writer = sink.open_writer(init_result, str(uuid.uuid4()))
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 191, in open_writer
>
>     return FileBasedSinkWriter(self, writer_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 395, in __init__
>
>     self.temp_handle = self.sink.open(temp_shard_path)
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py",
> line 397, in open
>
>     file_handle = super(_TextSink, self).open(temp_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 134, in open
>
>     return FileSystems.create(temp_path, self.mime_type,
> self.compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 217, in create
>
>     return filesystem.create(path, mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 155, in create
>
>     return self._path_open(path, 'wb', mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 137, in _path_open
>
>     raw_file = open(path, mode)
>
> FileNotFoundError: [Errno 2] No such file or directory:
> '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
>
>
>
>
>
> It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
>
> Please help me debug. The Flink monitoring dashboard shows the several
> stages of the job, Map, Reduce and what not… In the end, the status is
> FAILED.
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Monday, April 13, 2020 at 11:57 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> Running Beam Python on Spark on Kubernetes is more complicated, because
> Beam has its own solution for running Python code [1]. Unfortunately
> there's no guide that I know of for Spark yet, however we do have
> instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
> GCP's (unofficial) Flink and Spark [3] operators, are probably similar
> enough that it shouldn't be too hard to port the YAML from the Flink
> operator to the Spark operator. I filed an issue for it [4], but I probably
> won't have the bandwidth to work on it myself for a while.
>
>
>
> - Kyle
>
>
>
> [1] https://beam.apache.org/roadmap/portability/
>
> [2]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
>
> [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
>
> [4]
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
>
>
>
> On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> [email protected]> wrote:
>
> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <[email protected]>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> [email protected]> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Reply via email to