Kyle

Thank you for the advice.

> For example, Yu's pipeline errored here because the expected Docker
container wasn't built before running.

I was able to spin up the harness container  and submit job to the job
service by preparing the container properly.
I needed to do extra steps  in the online instruction..
What I have done is things you should already know I guess.

Below (*) is what I have done.

============================================================================
https://beam.apache.org/documentation/runners/flink/

Executing a Beam pipeline on a Flink Cluster

1. Only required once: Build the SDK harness container (optionally replace
py35 with the Python version of your choice): ./gradlew
:sdks:python:container:py35:docker
*2. Prepare bintray account (https://bintray.com/)
*3. Push the image to bintray registry using "docker push" (mentioned here
=> https://github.com/apache/beam/blob/release-2.15.0/sdks/CONTAINERS.md)
*4. Login to bintray account by "docker login"
5.. Start the JobService endpoint: ./gradlew
:runners:flink:1.5:job-server:runShadow

The JobService is the central instance where you submit your Beam pipeline
to. The JobService will create a Flink job for the pipeline and execute the
job. To execute the job on a Flink cluster, the Beam JobService needs to be
provided with the Flink JobManager address.

6. Submit the Python pipeline to the above endpoint by using the
PortableRunner and job_endpoint set to localhost:8099 (this is the default
address of the JobService). For example:
============================================================================

Thanks,
Yu Watanabe

On Fri, Sep 13, 2019 at 5:09 AM Kyle Weaver <[email protected]> wrote:

> +dev <[email protected]> I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected]
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <[email protected]>
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <[email protected]>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask for help with my sample code using portable runner
>>>> using apache flink.
>>>> I was able to work out the wordcount.py using this page.
>>>>
>>>> https://beam.apache.org/roadmap/portability/
>>>>
>>>> I got below two files under /tmp.
>>>>
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>> py-wordcount-direct-00001-of-00002
>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>> py-wordcount-direct-00000-of-00002
>>>>
>>>> Then I wrote sample code with below steps.
>>>>
>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>> Placed it separate directory from source code.
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>> total 16
>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>> code directory)
>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>> test-portable-runner.py
>>>>
>>>> -----------------------------------------------------------------------------------
>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>
>>>>
>>>> ==========================================================================================
>>>> #!/usr/bin/env
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.io import WriteToText
>>>>
>>>>
>>>> def printMsg(line):
>>>>
>>>>     print("OUTPUT: {0}".format(line))
>>>>
>>>>     return line
>>>>
>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>>
>>>> p = beam.Pipeline(options=options)
>>>>
>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>              | beam.Map(printMsg)
>>>>          )
>>>>
>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>
>>>> =======================================================================================
>>>>
>>>> Job seemed to went all the way to "FINISHED" state.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>>> to RUNNING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to 
>>>> SCHEDULED.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>> DEPLOYING.
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>> MapPartition (MapPartition at
>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
>>>> [DataSource (Impulse) (1/1)] INFO
>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> But I ended up with docker error on client side.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>> supported by Apache Beam.
>>>>   'Some syntactic constructs of Python 3 are not yet fully supported by
>>>> '
>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>> Traceback (most recent call last):
>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>     result.wait_until_finish()
>>>>   File
>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 446, in wait_until_finish
>>>>     self._job_id, self._state, self._last_error_message()))
>>>> RuntimeError: Pipeline
>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>> stderr: Unable to find image '
>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
>>>> Error response from daemon: unknown: Subject ywatanabe was not found.See
>>>> 'docker run --help'.
>>>>
>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> As a result , I got nothing under /tmp . Code works when using
>>>> DirectRunner.
>>>> May I ask , where should I look for in order to get the pipeline to
>>>> write results to text files under /tmp ?
>>>>
>>>> Best Regards,
>>>> Yu Watanabe
>>>>
>>>> --
>>>> Yu Watanabe
>>>> Weekend Freelancer who loves to challenge building data platform
>>>> [email protected]
>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>
>>>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected]
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to