I agree that loopback would be preferable for this purpose. I just wasn't
aware this even works with the portable Flink runner. Is it one of the best
guarded secrets? ;-)

Kyle, can you please post the pipeline options you would use for Flink?


On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <[email protected]> wrote:

> I prefer loopback because a) it writes output files to the local
> filesystem, as the user expects, and b) you don't have to pull or build
> docker images, or even have docker installed on your system -- which is one
> less point of failure.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected]
>
>
> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <[email protected]> wrote:
>
>> This should become much better with 2.16 when we have the Docker images
>> prebuilt.
>>
>> Docker is probably still the best option for Python on a JVM based runner
>> in a local environment that does not have a development setup.
>>
>>
>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <[email protected]> wrote:
>>
>>> +dev <[email protected]> I think we should probably point new users
>>> of the portable Flink/Spark runners to use loopback or some other
>>> non-docker environment, as Docker adds some operational complexity that
>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>> errored here because the expected Docker container wasn't built before
>>> running.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected]
>>>
>>>
>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On this note, making local files easy to read is something we'd
>>>> definitely like to improve, as the current behavior is quite surprising.
>>>> This could be useful not just for running with docker and the portable
>>>> runner locally, but more generally when running on a distributed system
>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>>> could automatically stage local files to be read as artifacts that could be
>>>> consumed by any worker (possibly via external directory mounting in the
>>>> local docker case rather than an actual copy), and conversely copy small
>>>> outputs back to the local machine (with the similar optimization for local
>>>> docker).
>>>>
>>>> At the very least, however, obvious messaging when the local filesystem
>>>> is used from within docker, which is often a (non-obvious and hard to
>>>> debug) mistake should be added.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> When you use a local filesystem path and a docker environment, "/tmp"
>>>>> is written inside the container. You can solve this issue by:
>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>> * Mounting an external directory into the container so that any
>>>>> "local" writes appear outside the container
>>>>> * Using a non-docker environment such as external or process.
>>>>>
>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello.
>>>>>>
>>>>>> I would like to ask for help with my sample code using portable
>>>>>> runner using apache flink.
>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>
>>>>>> https://beam.apache.org/roadmap/portability/
>>>>>>
>>>>>> I got below two files under /tmp.
>>>>>>
>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    185 Sep 12 19:56
>>>>>> py-wordcount-direct-00001-of-00002
>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe    190 Sep 12 19:56
>>>>>> py-wordcount-direct-00000-of-00002
>>>>>>
>>>>>> Then I wrote sample code with below steps.
>>>>>>
>>>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>>>> 2. Wrote sample code as below and named it
>>>>>> "test-protable-runner.py".  Placed it separate directory from source 
>>>>>> code.
>>>>>>
>>>>>> -----------------------------------------------------------------------------------
>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>>>>> total 16
>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>>>>> code directory)
>>>>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>>>>> test-portable-runner.py
>>>>>>
>>>>>> -----------------------------------------------------------------------------------
>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>>>>
>>>>>>
>>>>>> ==========================================================================================
>>>>>> #!/usr/bin/env
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>> from apache_beam.io import WriteToText
>>>>>>
>>>>>>
>>>>>> def printMsg(line):
>>>>>>
>>>>>>     print("OUTPUT: {0}".format(line))
>>>>>>
>>>>>>     return line
>>>>>>
>>>>>> options = PipelineOptions(["--runner=PortableRunner",
>>>>>> "--job_endpoint=localhost:8099", 
>>>>>> "--shutdown_sources_on_final_watermark"])
>>>>>>
>>>>>> p = beam.Pipeline(options=options)
>>>>>>
>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>>>>              | beam.Map(printMsg)
>>>>>>          )
>>>>>>
>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>>>>
>>>>>> =======================================================================================
>>>>>>
>>>>>> Job seemed to went all the way to "FINISHED" state.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) 
>>>>>> [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from 
>>>>>> DEPLOYING
>>>>>> to RUNNING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN 
>>>>>> MapPartition
>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to 
>>>>>> SCHEDULED.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN 
>>>>>> MapPartition
>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at
>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>>>>>> DEPLOYING.
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
>>>>>> MapPartition (MapPartition at
>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
>>>>>> [flink-akka.actor.default-dispatcher-2] INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
>>>>>> MapPartition (MapPartition at
>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>),
>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) 
>>>>>> [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) 
>>>>>> [DEPLOYING].
>>>>>> [DataSource (Impulse) (1/1)] INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> But I ended up with docker error on client side.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully
>>>>>> supported by Apache Beam.
>>>>>>   'Some syntactic constructs of Python 3 are not yet fully supported
>>>>>> by '
>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command
>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>> stderr: Unable to find image '
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>> not found.See 'docker run --help'.
>>>>>> Traceback (most recent call last):
>>>>>>   File "test-portable-runner.py", line 27, in <module>
>>>>>>     result.wait_until_finish()
>>>>>>   File
>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>> line 446, in wait_until_finish
>>>>>>     self._job_id, self._state, self._last_error_message()))
>>>>>> RuntimeError: Pipeline
>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 for
>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
>>>>>> --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
>>>>>> stderr: Unable to find image '
>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest'
>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe was
>>>>>> not found.See 'docker run --help'.
>>>>>>
>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> As a result , I got nothing under /tmp . Code works when using
>>>>>> DirectRunner.
>>>>>> May I ask , where should I look for in order to get the pipeline to
>>>>>> write results to text files under /tmp ?
>>>>>>
>>>>>> Best Regards,
>>>>>> Yu Watanabe
>>>>>>
>>>>>> --
>>>>>> Yu Watanabe
>>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>>> [email protected]
>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>>
>>>>>

Reply via email to