Kyle,
Thank you for the response. I paste my simple Python code (masked out IP) below
my sign, which works totally fine with default runner.
PipelineOptions are:
"--runner=PortableRunner",
"--job_endpoint=XXXXXXXXXX:8099"
(Replace XXXX with the IP of job runner)
I use ParDo
-Buvana
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Printer(beam.DoFn):
def process(self,data_item):
print (data_item)
class DateExtractor(beam.DoFn):
def process(self,data_item):
return [data_item.split(',')[0]]
# instantiate the pipeline
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=XXXXXXXXXX:8099"
])
with beam.Pipeline(options=options) as p:
data_from_source = (p
| 'ReadMyFile 01' >>
beam.io.ReadFromText('apache-beam-tutorial/data/sp500.csv')
| 'Splitter using beam.ParDo 01' >>
beam.ParDo(DateExtractor())
| 'Printer the data 02' >> beam.ParDo(Printer())
)
From: Kyle Weaver <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, April 16, 2020 at 3:16 PM
To: "[email protected]" <[email protected]>
Subject: Re: SparkRunner on k8s
Hi Buvana,
After looking a bit closer, it seems this might be a bug in Beam. If you could
share a) the pipeline options you are using and b) which source (read)
transform you are using in your pipeline, that would be helpful for debugging.
Thanks,
Kyle
On Wed, Apr 15, 2020 at 11:21 PM Ramanan, Buvana (Nokia - US/Murray Hill)
<[email protected]<mailto:[email protected]>>
wrote:
Kyle,
I also built Python SDK from source of the same branch (release-2.19.0) that is
being used by the Job Runner. Same error is manifesting (INVALID_ARGUMENT)
This has been a very tedious venture with no luck so far. Hope to get something
working soon.
-Buvana
From: "Ramanan, Buvana (Nokia - US/Murray Hill)"
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Wednesday, April 15, 2020 at 9:05 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: SparkRunner on k8s
Hi Kyle,
Thanks a lot for pointing that out. I am using version Python SDK 2.19.0; as
per your email, I now did a git checkout of release-2.19.0 branch and executed
the portable runner and I still encounter this error. ☹
-Buvana
From: Kyle Weaver <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Wednesday, April 15, 2020 at 2:48 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: SparkRunner on k8s
Hi Buvana,
The usual cause of errors like this is a mismatch between the Python SDK and
the Spark job server. Since you are building the job server from source, I
would make sure you have checked out the same version as the Python SDK you are
using.
Hope that helps.
Kyle
On Mon, Apr 13, 2020 at 7:41 PM Ramanan, Buvana (Nokia - US/Murray Hill)
<[email protected]<mailto:[email protected]>>
wrote:
Hello,
I am trying to test the Beam Python pipeline on SparkRunner with Spark on
Mesos. Followed the instructions here:
https://beam.apache.org/documentation/runners/spark/
The portable job runner is up and running and is pointing to a valid Spark
Master URL (Spark on Mesos).
However, a simple Beam Python Pipeline (works totally fine on Local Runner)
submitted to this job runner fails with error code displayed below my sign. It
appears that the job runner finds issues with the pipeline syntax – may be its
looking for jvm pipelines and got a Python pipeline?
I would appreciate any pointers that you can provide.
Thank you,
Regards,
Buvana
Client Side:
=========
$ python test-beam.py
WARNING:root:Make sure that locally built Python SDK docker image has Python
3.6 interpreter.
Traceback (most recent call last):
File "test-beam.py", line 117, in <module>
beam.io.WriteToText(output_filename)
File
"/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 481, in __exit__
self.run().wait_until_finish()
File
"/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 461, in run
self._options).run(False)
File
"/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 474, in run
return self.runner.run_pipeline(self, self._options)
File
"/home/tfs/venv_beam3/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
line 317, in run_pipeline
retrieval_token=retrieval_token))
File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py",
line 826, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/home/tfs/venv_beam3/lib/python3.6/site-packages/grpc/_channel.py",
line 729, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = ""
debug_error_string =
"{"created":"@1586818954.316495237","description":"Error received from peer
ipv4:XXXXXXXXXX:8000","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"","grpc_status":3}"
-----------------------------------------------------------------------
job Runner:
=========
tfs@datamon4:/nas2/tfs/beam$ ./gradlew :runners:spark:job-server:runShadow
-PsparkMasterUrl=spark://$SPARK_MASTER:7077
Starting a Gradle Daemon (subsequent builds will be faster)
Configuration on demand is an incubating feature.
> Task :runners:spark:job-server:runShadow
Listening for transport dt_socket at address: 5005
20/04/13 19:02:04 INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver:
LegacyArtifactStagingService started on localhost:8098
20/04/13 19:02:04 INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: Java
ExpansionService started on localhost:8097
20/04/13 19:02:04 INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver: JobService
started on localhost:8099
20/04/13 19:02:34 WARN
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService:
Encountered Unexpected Exception during validation
java.lang.RuntimeException: Failed to validate transform
ref_AppliedPTransform_ReadFromText/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)_6
at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:215)
at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:123)
at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:103)
at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:223)
at
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateParDo(PipelineValidator.java:238)
at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:213)
... 16 more
From: "Ramanan, Buvana (Nokia - US/Murray Hill)"
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Monday, April 13, 2020 at 6:55 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: SparkRunner on k8s
Kyle,
Thanks a lot for the pointers. I got interested to run my beam pipeline on
FlinkRunner and got a local Flink cluster setup, tested a sample code to work
fine.
I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
$IP:8081 --job-host $IP --job-port 8099
Submitted a beam pipeline, which when run with LocalRunner works totally fine.
The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
output= (
{
'Mean Open': mean_open,
'Mean Close': mean_close
} |
beam.CoGroupByKey() |
beam.io.WriteToText(args.output)
)
So, we are ending the pipeline with a io.WriteToText()
Now, when I supply a filename, whether residing in local disk (/tmp) or network
mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt
WARNING:root:Make sure that locally built Python SDK docker image has Python
3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
instruction 2: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 883, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 667, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 748, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1095, in _finalize_write
writer = sink.open_writer(init_result, str(uuid.uuid4()))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 140, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
191, in open_writer
return FileBasedSinkWriter(self, writer_path)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
395, in __init__
self.temp_handle = self.sink.open(temp_shard_path)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line
397, in open
file_handle = super(_TextSink, self).open(temp_path)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 140, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
134, in open
return FileSystems.create(temp_path, self.mime_type, self.compression_type)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 217, in create
return filesystem.create(path, mime_type, compression_type)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
line 155, in create
return self._path_open(path, 'wb', mime_type, compression_type)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
line 137, in _path_open
raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory:
'/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
It appears that the filesystem in the client side is not the same as the
environment that Flink creates to run the Beam pipeline (I think Flink does a
docker run of the python sdk to run the Beam pipeline? In that case, how would
the container know where to write the file?)
Please help me debug. The Flink monitoring dashboard shows the several stages
of the job, Map, Reduce and what not… In the end, the status is FAILED.
-Buvana
From: Kyle Weaver <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Monday, April 13, 2020 at 11:57 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: SparkRunner on k8s
Hi Buvana,
Running Beam Python on Spark on Kubernetes is more complicated, because Beam
has its own solution for running Python code [1]. Unfortunately there's no
guide that I know of for Spark yet, however we do have instructions for Flink
[2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and
Spark [3] operators, are probably similar enough that it shouldn't be too hard
to port the YAML from the Flink operator to the Spark operator. I filed an
issue for it [4], but I probably won't have the bandwidth to work on it myself
for a while.
- Kyle
[1] https://beam.apache.org/roadmap/portability/
[2]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill)
<[email protected]<mailto:[email protected]>>
wrote:
Thank you, Rahul for your very useful response. Can you please extend your
response by commenting on the procedure for Beam python pipeline?
From: rahul patwari
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <[email protected]<mailto:[email protected]>>
Subject: Re: SparkRunner on k8s
Hi Buvana,
You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline
using the spark-submit script.
Create an Uber Jar of your Beam code and provide it as the primary resource to
spark-submit. Provide the k8s master and the container image to use as
arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know
more about how to run Spark on k8s.
The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in
Runtime.
Regards,
Rahul
On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill)
<[email protected]<mailto:[email protected]>>
wrote:
Hello,
I newly joined this group and I went through the archive to see if any
discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy
my beam pipeline on a SparkRunner with k8s underneath.
The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).
Can someone please point me to relevant material in this regard? Or, provide
the steps for running my beam pipeline in this configuration?
Thank you,
Regards,
Buvana