> In other words, are there options to the job runner that would eventually
translate to ' --volume /storage1:/storage1 ' while the docker container is
being run by Flink? Even if it means code changes and building from source,
its fine. Please point me in the right direction.

I found an open feature request for this, but unfortunately it looks like
neither of two attempted implementations ended up being merged:
https://issues.apache.org/jira/browse/BEAM-5440

Sorry I haven't had much time to look into your issue with the Spark
runner. If you are still interested in trying it, you might try using a
different Beam version and see if the problem persists.

On Wed, Apr 22, 2020 at 7:56 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
[email protected]> wrote:

> Hi Kyle,
>
> About FlinkRunner:
>
> "One is you can mount a directory from the Docker host inside the
> container(s). But the more scalable solution is to use a distributed file
> system, such as HDFS, Google Cloud Storage, or Amazon S3"
>
> I am running some benchmarking tests and so I prefer not to use GCS or S3
> (as the network delay can kill the performance).
>
> I would like to focus on the option of the host mounting the volume into
> the containers, but I have not come across a docker command where a host
> can mount volumes into running containers. I do not think 'docker create'
> volume will help here, please correct if I am wrong.
>
> Is there a way the job runner can tell the Flink cluster to mount certain
> volumes before running the sdk container? And if so, is there a way I can
> tell the job runner to tell Flink to mount these volumes?
>
> In other words, are there options to the job runner that would eventually
> translate to ' --volume /storage1:/storage1 ' while the docker container is
> being run by Flink? Even if it means code changes and building from source,
> its fine. Please point me in the right direction.
>
> Thanks,
> Buvana
> ------------------------------
> *From:* Kyle Weaver <[email protected]>
> *Sent:* Monday, April 13, 2020 7:34 PM
> *To:* [email protected] <[email protected]>
> *Subject:* Re: SparkRunner on k8s
>
> > It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
> You are correct. Beam Python execution takes place within a Docker
> container, or often multiple containers, depending on your pipeline and
> configuration. Multiple containers is probably the cause of the error here.
> The Python SDK doesn't do anything special with local file paths; it just
> writes them to the local file system of the container. So in order to get a
> persistent, shared file system, you have a couple options. One is you can
> mount a directory from the Docker host inside the container(s). But the
> more scalable solution is to use a distributed file system, such as HDFS,
> Google Cloud Storage, or Amazon S3. Check out the Beam programming guide
> for more info:
> https://beam.apache.org/documentation/programming-guide/#pipeline-io
>
> On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> [email protected]> wrote:
>
> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
>     output= (
>
>         {
>
>             'Mean Open': mean_open,
>
>             'Mean Close': mean_close
>
>         } |
>
>         beam.CoGroupByKey() |
>
>         beam.io.WriteToText(args.output)
>
>     )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in _finalize_write
>
>     writer = sink.open_writer(init_result, str(uuid.uuid4()))
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 191, in open_writer
>
>     return FileBasedSinkWriter(self, writer_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 395, in __init__
>
>     self.temp_handle = self.sink.open(temp_shard_path)
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py",
> line 397, in open
>
>     file_handle = super(_TextSink, self).open(temp_path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 134, in open
>
>     return FileSystems.create(temp_path, self.mime_type,
> self.compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 217, in create
>
>     return filesystem.create(path, mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 155, in create
>
>     return self._path_open(path, 'wb', mime_type, compression_type)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py",
> line 137, in _path_open
>
>     raw_file = open(path, mode)
>
> FileNotFoundError: [Errno 2] No such file or directory:
> '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'
>
>
>
>
>
> It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
>
> Please help me debug. The Flink monitoring dashboard shows the several
> stages of the job, Map, Reduce and what not… In the end, the status is
> FAILED.
>
>
>
> -Buvana
>
>
>
> *From: *Kyle Weaver <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Monday, April 13, 2020 at 11:57 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> Running Beam Python on Spark on Kubernetes is more complicated, because
> Beam has its own solution for running Python code [1]. Unfortunately
> there's no guide that I know of for Spark yet, however we do have
> instructions for Flink [2]. Beam's Flink and Spark runners, and I assume
> GCP's (unofficial) Flink and Spark [3] operators, are probably similar
> enough that it shouldn't be too hard to port the YAML from the Flink
> operator to the Spark operator. I filed an issue for it [4], but I probably
> won't have the bandwidth to work on it myself for a while.
>
>
>
> - Kyle
>
>
>
> [1] https://beam.apache.org/roadmap/portability/
>
> [2]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
>
> [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
>
> [4]
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870
>
>
>
> On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> [email protected]> wrote:
>
> Thank you, Rahul for your very useful response. Can you please extend your
> response by commenting on the procedure for Beam python pipeline?
>
>
>
> *From: *rahul patwari <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Friday, April 10, 2020 at 10:57 PM
> *To: *user <[email protected]>
> *Subject: *Re: SparkRunner on k8s
>
>
>
> Hi Buvana,
>
>
>
> You can submit a Beam Pipeline to Spark on k8s like any other Spark
> Pipeline using the spark-submit script.
>
>
>
> Create an Uber Jar of your Beam code and provide it as the primary
> resource to spark-submit. Provide the k8s master and the container image to
> use as arguments to spark-submit.
>
> Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to
> know more about how to run Spark on k8s.
>
>
>
> The Beam pipeline will be translated to a Spark Pipeline using Spark APIs
> in Runtime.
>
>
>
> Regards,
>
> Rahul
>
>
>
> On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) <
> [email protected]> wrote:
>
> Hello,
>
>
>
> I newly joined this group and I went through the archive to see if any
> discussion exists on submitting Beam pipelines to a SparkRunner on k8s.
>
>
>
> I run my Spark jobs on a k8s cluster in the cluster mode. Would like to
> deploy my beam pipeline on a SparkRunner with k8s underneath.
>
>
>
> The Beam documentation:
>
> https://beam.apache.org/documentation/runners/spark/
>
> does not discuss about k8s (though there is mention of Mesos and YARN).
>
>
>
> Can someone please point me to relevant material in this regard? Or,
> provide the steps for running my beam pipeline in this configuration?
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>

Reply via email to