> In other words, are there options to the job runner that would eventually translate to ' --volume /storage1:/storage1 ' while the docker container is being run by Flink? Even if it means code changes and building from source, its fine. Please point me in the right direction.
I found an open feature request for this, but unfortunately it looks like neither of two attempted implementations ended up being merged: https://issues.apache.org/jira/browse/BEAM-5440 Sorry I haven't had much time to look into your issue with the Spark runner. If you are still interested in trying it, you might try using a different Beam version and see if the problem persists. On Wed, Apr 22, 2020 at 7:56 PM Ramanan, Buvana (Nokia - US/Murray Hill) < [email protected]> wrote: > Hi Kyle, > > About FlinkRunner: > > "One is you can mount a directory from the Docker host inside the > container(s). But the more scalable solution is to use a distributed file > system, such as HDFS, Google Cloud Storage, or Amazon S3" > > I am running some benchmarking tests and so I prefer not to use GCS or S3 > (as the network delay can kill the performance). > > I would like to focus on the option of the host mounting the volume into > the containers, but I have not come across a docker command where a host > can mount volumes into running containers. I do not think 'docker create' > volume will help here, please correct if I am wrong. > > Is there a way the job runner can tell the Flink cluster to mount certain > volumes before running the sdk container? And if so, is there a way I can > tell the job runner to tell Flink to mount these volumes? > > In other words, are there options to the job runner that would eventually > translate to ' --volume /storage1:/storage1 ' while the docker container is > being run by Flink? Even if it means code changes and building from source, > its fine. Please point me in the right direction. > > Thanks, > Buvana > ------------------------------ > *From:* Kyle Weaver <[email protected]> > *Sent:* Monday, April 13, 2020 7:34 PM > *To:* [email protected] <[email protected]> > *Subject:* Re: SparkRunner on k8s > > > It appears that the filesystem in the client side is not the same as the > environment that Flink creates to run the Beam pipeline (I think Flink does > a docker run of the python sdk to run the Beam pipeline? In that case, how > would the container know where to write the file?) > > > You are correct. Beam Python execution takes place within a Docker > container, or often multiple containers, depending on your pipeline and > configuration. Multiple containers is probably the cause of the error here. > The Python SDK doesn't do anything special with local file paths; it just > writes them to the local file system of the container. So in order to get a > persistent, shared file system, you have a couple options. One is you can > mount a directory from the Docker host inside the container(s). But the > more scalable solution is to use a distributed file system, such as HDFS, > Google Cloud Storage, or Amazon S3. Check out the Beam programming guide > for more info: > https://beam.apache.org/documentation/programming-guide/#pipeline-io > > On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Kyle, > > > > Thanks a lot for the pointers. I got interested to run my beam pipeline on > FlinkRunner and got a local Flink cluster setup, tested a sample code to > work fine. > > > > I started the Beam job runner going: > > docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master > $IP:8081 --job-host $IP --job-port 8099 > > > > Submitted a beam pipeline, which when run with LocalRunner works totally > fine. The last stage of the pipeline code looks as follows: > > . . . > > . . . > > . . . > > output= ( > > { > > 'Mean Open': mean_open, > > 'Mean Close': mean_close > > } | > > beam.CoGroupByKey() | > > beam.io.WriteToText(args.output) > > ) > > > > So, we are ending the pipeline with a io.WriteToText() > > > > Now, when I supply a filename, whether residing in local disk (/tmp) or > network mounted disk(e.g /nas2), I get the following error: > > python test-beam.py –input data/sp500.csv –output /tmp/result.txt > > > > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.6 interpreter. > > ERROR:root:java.lang.RuntimeException: Error received from SDK harness for > instruction 2: Traceback (most recent call last): > > File "apache_beam/runners/common.py", line 883, in > apache_beam.runners.common.DoFnRunner.process > > File "apache_beam/runners/common.py", line 667, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > > File "apache_beam/runners/common.py", line 748, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", > line 1095, in _finalize_write > > writer = sink.open_writer(init_result, str(uuid.uuid4())) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 191, in open_writer > > return FileBasedSinkWriter(self, writer_path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 395, in __init__ > > self.temp_handle = self.sink.open(temp_shard_path) > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", > line 397, in open > > file_handle = super(_TextSink, self).open(temp_path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 134, in open > > return FileSystems.create(temp_path, self.mime_type, > self.compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 217, in create > > return filesystem.create(path, mime_type, compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", > line 155, in create > > return self._path_open(path, 'wb', mime_type, compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", > line 137, in _path_open > > raw_file = open(path, mode) > > FileNotFoundError: [Errno 2] No such file or directory: > '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt' > > > > > > It appears that the filesystem in the client side is not the same as the > environment that Flink creates to run the Beam pipeline (I think Flink does > a docker run of the python sdk to run the Beam pipeline? In that case, how > would the container know where to write the file?) > > > > Please help me debug. The Flink monitoring dashboard shows the several > stages of the job, Map, Reduce and what not… In the end, the status is > FAILED. > > > > -Buvana > > > > *From: *Kyle Weaver <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Monday, April 13, 2020 at 11:57 AM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Hi Buvana, > > > > Running Beam Python on Spark on Kubernetes is more complicated, because > Beam has its own solution for running Python code [1]. Unfortunately > there's no guide that I know of for Spark yet, however we do have > instructions for Flink [2]. Beam's Flink and Spark runners, and I assume > GCP's (unofficial) Flink and Spark [3] operators, are probably similar > enough that it shouldn't be too hard to port the YAML from the Flink > operator to the Spark operator. I filed an issue for it [4], but I probably > won't have the bandwidth to work on it myself for a while. > > > > - Kyle > > > > [1] https://beam.apache.org/roadmap/portability/ > > [2] > https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md > > [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/ > > [4] > https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870 > > > > On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Thank you, Rahul for your very useful response. Can you please extend your > response by commenting on the procedure for Beam python pipeline? > > > > *From: *rahul patwari <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Friday, April 10, 2020 at 10:57 PM > *To: *user <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Hi Buvana, > > > > You can submit a Beam Pipeline to Spark on k8s like any other Spark > Pipeline using the spark-submit script. > > > > Create an Uber Jar of your Beam code and provide it as the primary > resource to spark-submit. Provide the k8s master and the container image to > use as arguments to spark-submit. > > Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to > know more about how to run Spark on k8s. > > > > The Beam pipeline will be translated to a Spark Pipeline using Spark APIs > in Runtime. > > > > Regards, > > Rahul > > > > On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Hello, > > > > I newly joined this group and I went through the archive to see if any > discussion exists on submitting Beam pipelines to a SparkRunner on k8s. > > > > I run my Spark jobs on a k8s cluster in the cluster mode. Would like to > deploy my beam pipeline on a SparkRunner with k8s underneath. > > > > The Beam documentation: > > https://beam.apache.org/documentation/runners/spark/ > > does not discuss about k8s (though there is mention of Mesos and YARN). > > > > Can someone please point me to relevant material in this regard? Or, > provide the steps for running my beam pipeline in this configuration? > > > > Thank you, > > Regards, > > Buvana > >
