Kyle Thank you for the advice.
> For example, Yu's pipeline errored here because the expected Docker container wasn't built before running. I was able to spin up the harness container and submit job to the job service by preparing the container properly. I needed to do extra steps in the online instruction.. What I have done is things you should already know I guess. Below (*) is what I have done. ============================================================================ https://beam.apache.org/documentation/runners/flink/ Executing a Beam pipeline on a Flink Cluster 1. Only required once: Build the SDK harness container (optionally replace py35 with the Python version of your choice): ./gradlew :sdks:python:container:py35:docker *2. Prepare bintray account (https://bintray.com/) *3. Push the image to bintray registry using "docker push" (mentioned here => https://github.com/apache/beam/blob/release-2.15.0/sdks/CONTAINERS.md) *4. Login to bintray account by "docker login" 5.. Start the JobService endpoint: ./gradlew :runners:flink:1.5:job-server:runShadow The JobService is the central instance where you submit your Beam pipeline to. The JobService will create a Flink job for the pipeline and execute the job. To execute the job on a Flink cluster, the Beam JobService needs to be provided with the Flink JobManager address. 6. Submit the Python pipeline to the above endpoint by using the PortableRunner and job_endpoint set to localhost:8099 (this is the default address of the JobService). For example: ============================================================================ Thanks, Yu Watanabe On Fri, Sep 13, 2019 at 5:09 AM Kyle Weaver <[email protected]> wrote: > +dev <[email protected]> I think we should probably point new users of > the portable Flink/Spark runners to use loopback or some other non-docker > environment, as Docker adds some operational complexity that isn't really > needed to run a word count example. For example, Yu's pipeline errored here > because the expected Docker container wasn't built before running. > > Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] > > > On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <[email protected]> > wrote: > >> On this note, making local files easy to read is something we'd >> definitely like to improve, as the current behavior is quite surprising. >> This could be useful not just for running with docker and the portable >> runner locally, but more generally when running on a distributed system >> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we >> could automatically stage local files to be read as artifacts that could be >> consumed by any worker (possibly via external directory mounting in the >> local docker case rather than an actual copy), and conversely copy small >> outputs back to the local machine (with the similar optimization for local >> docker). >> >> At the very least, however, obvious messaging when the local filesystem >> is used from within docker, which is often a (non-obvious and hard to >> debug) mistake should be added. >> >> >> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <[email protected]> wrote: >> >>> When you use a local filesystem path and a docker environment, "/tmp" is >>> written inside the container. You can solve this issue by: >>> * Using a "remote" filesystem such as HDFS/S3/GCS/... >>> * Mounting an external directory into the container so that any "local" >>> writes appear outside the container >>> * Using a non-docker environment such as external or process. >>> >>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <[email protected]> >>> wrote: >>> >>>> Hello. >>>> >>>> I would like to ask for help with my sample code using portable runner >>>> using apache flink. >>>> I was able to work out the wordcount.py using this page. >>>> >>>> https://beam.apache.org/roadmap/portability/ >>>> >>>> I got below two files under /tmp. >>>> >>>> -rw-r--r-- 1 ywatanabe ywatanabe 185 Sep 12 19:56 >>>> py-wordcount-direct-00001-of-00002 >>>> -rw-r--r-- 1 ywatanabe ywatanabe 190 Sep 12 19:56 >>>> py-wordcount-direct-00000-of-00002 >>>> >>>> Then I wrote sample code with below steps. >>>> >>>> 1.Install apache_beam using pip3 separate from source code directory. >>>> 2. Wrote sample code as below and named it "test-protable-runner.py". >>>> Placed it separate directory from source code. >>>> >>>> ----------------------------------------------------------------------------------- >>>> (python) ywatanabe@debian-09-00:~$ ls -ltr >>>> total 16 >>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source >>>> code directory) >>>> -rw-r--r-- 1 ywatanabe ywatanabe 634 Sep 12 20:25 >>>> test-portable-runner.py >>>> >>>> ----------------------------------------------------------------------------------- >>>> 3. Executed the code with "python3 test-protable-ruuner.py" >>>> >>>> >>>> ========================================================================================== >>>> #!/usr/bin/env >>>> >>>> import apache_beam as beam >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> from apache_beam.io import WriteToText >>>> >>>> >>>> def printMsg(line): >>>> >>>> print("OUTPUT: {0}".format(line)) >>>> >>>> return line >>>> >>>> options = PipelineOptions(["--runner=PortableRunner", >>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"]) >>>> >>>> p = beam.Pipeline(options=options) >>>> >>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"]) >>>> | beam.Map(printMsg) >>>> ) >>>> >>>> output | 'write' >> WriteToText('/tmp/sample.txt') >>>> >>>> ======================================================================================= >>>> >>>> Job seemed to went all the way to "FINISHED" state. >>>> >>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> [DataSource (Impulse) (1/1)] INFO >>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network: >>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING]. >>>> [DataSource (Impulse) (1/1)] INFO >>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING. >>>> [flink-akka.actor.default-dispatcher-3] INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource >>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING >>>> to RUNNING. >>>> [flink-akka.actor.default-dispatcher-3] INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition >>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at >>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to >>>> SCHEDULED. >>>> [flink-akka.actor.default-dispatcher-3] INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition >>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at >>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to >>>> DEPLOYING. >>>> [flink-akka.actor.default-dispatcher-3] INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN >>>> MapPartition (MapPartition at >>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>), >>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0) >>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1) >>>> [flink-akka.actor.default-dispatcher-2] INFO >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN >>>> MapPartition (MapPartition at >>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>), >>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1). >>>> [DataSource (Impulse) (1/1)] INFO >>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task >>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING]. >>>> [DataSource (Impulse) (1/1)] INFO >>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network: >>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING]. >>>> [DataSource (Impulse) (1/1)] INFO >>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED. >>>> >>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> >>>> But I ended up with docker error on client side. >>>> >>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py >>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84: >>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully >>>> supported by Apache Beam. >>>> 'Some syntactic constructs of Python 3 are not yet fully supported by >>>> ' >>>> ERROR:root:java.io.IOException: Received exit code 125 for command >>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 >>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 >>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'. >>>> stderr: Unable to find image ' >>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker: >>>> Error response from daemon: unknown: Subject ywatanabe was not found.See >>>> 'docker run --help'. >>>> Traceback (most recent call last): >>>> File "test-portable-runner.py", line 27, in <module> >>>> result.wait_until_finish() >>>> File >>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", >>>> line 446, in wait_until_finish >>>> self._job_id, self._state, self._last_error_message())) >>>> RuntimeError: Pipeline >>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9 >>>> failed in state FAILED: java.io.IOException: Received exit code 125 for >>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 >>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 >>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'. >>>> stderr: Unable to find image ' >>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker: >>>> Error response from daemon: unknown: Subject ywatanabe was not found.See >>>> 'docker run --help'. >>>> >>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> >>>> As a result , I got nothing under /tmp . Code works when using >>>> DirectRunner. >>>> May I ask , where should I look for in order to get the pipeline to >>>> write results to text files under /tmp ? >>>> >>>> Best Regards, >>>> Yu Watanabe >>>> >>>> -- >>>> Yu Watanabe >>>> Weekend Freelancer who loves to challenge building data platform >>>> [email protected] >>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>>> Twitter icon] <https://twitter.com/yuwtennis> >>>> >>> -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform [email protected] [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
