> It appears that the filesystem in the client side is not the same as the environment that Flink creates to run the Beam pipeline (I think Flink does a docker run of the python sdk to run the Beam pipeline? In that case, how would the container know where to write the file?)
You are correct. Beam Python execution takes place within a Docker container, or often multiple containers, depending on your pipeline and configuration. Multiple containers is probably the cause of the error here. The Python SDK doesn't do anything special with local file paths; it just writes them to the local file system of the container. So in order to get a persistent, shared file system, you have a couple options. One is you can mount a directory from the Docker host inside the container(s). But the more scalable solution is to use a distributed file system, such as HDFS, Google Cloud Storage, or Amazon S3. Check out the Beam programming guide for more info: https://beam.apache.org/documentation/programming-guide/#pipeline-io On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) < [email protected]> wrote: > Kyle, > > > > Thanks a lot for the pointers. I got interested to run my beam pipeline on > FlinkRunner and got a local Flink cluster setup, tested a sample code to > work fine. > > > > I started the Beam job runner going: > > docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master > $IP:8081 --job-host $IP --job-port 8099 > > > > Submitted a beam pipeline, which when run with LocalRunner works totally > fine. The last stage of the pipeline code looks as follows: > > . . . > > . . . > > . . . > > output= ( > > { > > 'Mean Open': mean_open, > > 'Mean Close': mean_close > > } | > > beam.CoGroupByKey() | > > beam.io.WriteToText(args.output) > > ) > > > > So, we are ending the pipeline with a io.WriteToText() > > > > Now, when I supply a filename, whether residing in local disk (/tmp) or > network mounted disk(e.g /nas2), I get the following error: > > python test-beam.py –input data/sp500.csv –output /tmp/result.txt > > > > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.6 interpreter. > > ERROR:root:java.lang.RuntimeException: Error received from SDK harness for > instruction 2: Traceback (most recent call last): > > File "apache_beam/runners/common.py", line 883, in > apache_beam.runners.common.DoFnRunner.process > > File "apache_beam/runners/common.py", line 667, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > > File "apache_beam/runners/common.py", line 748, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", > line 1095, in _finalize_write > > writer = sink.open_writer(init_result, str(uuid.uuid4())) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 191, in open_writer > > return FileBasedSinkWriter(self, writer_path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 395, in __init__ > > self.temp_handle = self.sink.open(temp_shard_path) > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", > line 397, in open > > file_handle = super(_TextSink, self).open(temp_path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 134, in open > > return FileSystems.create(temp_path, self.mime_type, > self.compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 217, in create > > return filesystem.create(path, mime_type, compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", > line 155, in create > > return self._path_open(path, 'wb', mime_type, compression_type) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", > line 137, in _path_open > > raw_file = open(path, mode) > > FileNotFoundError: [Errno 2] No such file or directory: > '/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt' > > > > > > It appears that the filesystem in the client side is not the same as the > environment that Flink creates to run the Beam pipeline (I think Flink does > a docker run of the python sdk to run the Beam pipeline? In that case, how > would the container know where to write the file?) > > > > Please help me debug. The Flink monitoring dashboard shows the several > stages of the job, Map, Reduce and what not… In the end, the status is > FAILED. > > > > -Buvana > > > > *From: *Kyle Weaver <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Monday, April 13, 2020 at 11:57 AM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Hi Buvana, > > > > Running Beam Python on Spark on Kubernetes is more complicated, because > Beam has its own solution for running Python code [1]. Unfortunately > there's no guide that I know of for Spark yet, however we do have > instructions for Flink [2]. Beam's Flink and Spark runners, and I assume > GCP's (unofficial) Flink and Spark [3] operators, are probably similar > enough that it shouldn't be too hard to port the YAML from the Flink > operator to the Spark operator. I filed an issue for it [4], but I probably > won't have the bandwidth to work on it myself for a while. > > > > - Kyle > > > > [1] https://beam.apache.org/roadmap/portability/ > > [2] > https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md > > [3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/ > > [4] > https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870 > > > > On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Thank you, Rahul for your very useful response. Can you please extend your > response by commenting on the procedure for Beam python pipeline? > > > > *From: *rahul patwari <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Friday, April 10, 2020 at 10:57 PM > *To: *user <[email protected]> > *Subject: *Re: SparkRunner on k8s > > > > Hi Buvana, > > > > You can submit a Beam Pipeline to Spark on k8s like any other Spark > Pipeline using the spark-submit script. > > > > Create an Uber Jar of your Beam code and provide it as the primary > resource to spark-submit. Provide the k8s master and the container image to > use as arguments to spark-submit. > > Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to > know more about how to run Spark on k8s. > > > > The Beam pipeline will be translated to a Spark Pipeline using Spark APIs > in Runtime. > > > > Regards, > > Rahul > > > > On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) < > [email protected]> wrote: > > Hello, > > > > I newly joined this group and I went through the archive to see if any > discussion exists on submitting Beam pipelines to a SparkRunner on k8s. > > > > I run my Spark jobs on a k8s cluster in the cluster mode. Would like to > deploy my beam pipeline on a SparkRunner with k8s underneath. > > > > The Beam documentation: > > https://beam.apache.org/documentation/runners/spark/ > > does not discuss about k8s (though there is mention of Mesos and YARN). > > > > Can someone please point me to relevant material in this regard? Or, > provide the steps for running my beam pipeline in this configuration? > > > > Thank you, > > Regards, > > Buvana > >
