I agree that loopback would be preferable for this purpose. I just wasn't aware this even works with the portable Flink runner. Is it one of the best guarded secrets? ;-)
Kyle, can you please post the pipeline options you would use for Flink? On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <[email protected]> wrote: > I prefer loopback because a) it writes output files to the local > filesystem, as the user expects, and b) you don't have to pull or build > docker images, or even have docker installed on your system -- which is one > less point of failure. > > Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] > > > On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <[email protected]> wrote: > >> This should become much better with 2.16 when we have the Docker images >> prebuilt. >> >> Docker is probably still the best option for Python on a JVM based runner >> in a local environment that does not have a development setup. >> >> >> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <[email protected]> wrote: >> >>> +dev <[email protected]> I think we should probably point new users >>> of the portable Flink/Spark runners to use loopback or some other >>> non-docker environment, as Docker adds some operational complexity that >>> isn't really needed to run a word count example. For example, Yu's pipeline >>> errored here because the expected Docker container wasn't built before >>> running. >>> >>> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] >>> >>> >>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On this note, making local files easy to read is something we'd >>>> definitely like to improve, as the current behavior is quite surprising. >>>> This could be useful not just for running with docker and the portable >>>> runner locally, but more generally when running on a distributed system >>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we >>>> could automatically stage local files to be read as artifacts that could be >>>> consumed by any worker (possibly via external directory mounting in the >>>> local docker case rather than an actual copy), and conversely copy small >>>> outputs back to the local machine (with the similar optimization for local >>>> docker). >>>> >>>> At the very least, however, obvious messaging when the local filesystem >>>> is used from within docker, which is often a (non-obvious and hard to >>>> debug) mistake should be added. >>>> >>>> >>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> When you use a local filesystem path and a docker environment, "/tmp" >>>>> is written inside the container. You can solve this issue by: >>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/... >>>>> * Mounting an external directory into the container so that any >>>>> "local" writes appear outside the container >>>>> * Using a non-docker environment such as external or process. >>>>> >>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <[email protected]> >>>>> wrote: >>>>> >>>>>> Hello. >>>>>> >>>>>> I would like to ask for help with my sample code using portable >>>>>> runner using apache flink. >>>>>> I was able to work out the wordcount.py using this page. >>>>>> >>>>>> https://beam.apache.org/roadmap/portability/ >>>>>> >>>>>> I got below two files under /tmp. >>>>>> >>>>>> -rw-r--r-- 1 ywatanabe ywatanabe 185 Sep 12 19:56 >>>>>> py-wordcount-direct-00001-of-00002 >>>>>> -rw-r--r-- 1 ywatanabe ywatanabe 190 Sep 12 19:56 >>>>>> py-wordcount-direct-00000-of-00002 >>>>>> >>>>>> Then I wrote sample code with below steps. >>>>>> >>>>>> 1.Install apache_beam using pip3 separate from source code directory. >>>>>> 2. Wrote sample code as below and named it >>>>>> "test-protable-runner.py". Placed it separate directory from source >>>>>> code. >>>>>> >>>>>> ----------------------------------------------------------------------------------- >>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr >>>>>> total 16 >>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source >>>>>> code directory) >>>>>> -rw-r--r-- 1 ywatanabe ywatanabe 634 Sep 12 20:25 >>>>>> test-portable-runner.py >>>>>> >>>>>> ----------------------------------------------------------------------------------- >>>>>> 3. Executed the code with "python3 test-protable-ruuner.py" >>>>>> >>>>>> >>>>>> ========================================================================================== >>>>>> #!/usr/bin/env >>>>>> >>>>>> import apache_beam as beam >>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>> from apache_beam.io import WriteToText >>>>>> >>>>>> >>>>>> def printMsg(line): >>>>>> >>>>>> print("OUTPUT: {0}".format(line)) >>>>>> >>>>>> return line >>>>>> >>>>>> options = PipelineOptions(["--runner=PortableRunner", >>>>>> "--job_endpoint=localhost:8099", >>>>>> "--shutdown_sources_on_final_watermark"]) >>>>>> >>>>>> p = beam.Pipeline(options=options) >>>>>> >>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"]) >>>>>> | beam.Map(printMsg) >>>>>> ) >>>>>> >>>>>> output | 'write' >> WriteToText('/tmp/sample.txt') >>>>>> >>>>>> ======================================================================================= >>>>>> >>>>>> Job seemed to went all the way to "FINISHED" state. >>>>>> >>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network: >>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) >>>>>> [DEPLOYING]. >>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING. >>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource >>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from >>>>>> DEPLOYING >>>>>> to RUNNING. >>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >>>>>> MapPartition >>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at >>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to >>>>>> SCHEDULED. >>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >>>>>> MapPartition >>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at >>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to >>>>>> DEPLOYING. >>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN >>>>>> MapPartition (MapPartition at >>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>), >>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0) >>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1) >>>>>> [flink-akka.actor.default-dispatcher-2] INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN >>>>>> MapPartition (MapPartition at >>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>), >>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1). >>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task >>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) >>>>>> [DEPLOYING]. >>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network: >>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) >>>>>> [DEPLOYING]. >>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED. >>>>>> >>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> >>>>>> But I ended up with docker error on client side. >>>>>> >>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py >>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84: >>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully >>>>>> supported by Apache Beam. >>>>>> 'Some syntactic constructs of Python 3 are not yet fully supported >>>>>> by ' >>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command >>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 >>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 >>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'. >>>>>> stderr: Unable to find image ' >>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' >>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was >>>>>> not found.See 'docker run --help'. >>>>>> Traceback (most recent call last): >>>>>> File "test-portable-runner.py", line 27, in <module> >>>>>> result.wait_until_finish() >>>>>> File >>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", >>>>>> line 446, in wait_until_finish >>>>>> self._job_id, self._state, self._last_error_message())) >>>>>> RuntimeError: Pipeline >>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9 >>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for >>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 >>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 >>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'. >>>>>> stderr: Unable to find image ' >>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' >>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was >>>>>> not found.See 'docker run --help'. >>>>>> >>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> >>>>>> As a result , I got nothing under /tmp . Code works when using >>>>>> DirectRunner. >>>>>> May I ask , where should I look for in order to get the pipeline to >>>>>> write results to text files under /tmp ? >>>>>> >>>>>> Best Regards, >>>>>> Yu Watanabe >>>>>> >>>>>> -- >>>>>> Yu Watanabe >>>>>> Weekend Freelancer who loves to challenge building data platform >>>>>> [email protected] >>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>>>>> Twitter icon] <https://twitter.com/yuwtennis> >>>>>> >>>>>
