Hello,
I tried to follow the instructions at
https://beam.apache.org/roadmap/portability/#python-on-flink,
1. I installed Flink local cluster, and followed their SocketWindowWordCount
example and confirmed the cluster works properly.
2. Start Flink job server:
./gradlew :beam-runners-flink_2.11-job-server:runShadow
-PflinkMasterUrl=localhost:8081
3. Subject the job as suggested by an earlier thread:
python -m apache_beam.examples.wordcount --input=/etc/profile
--output=/tmp/py-wordcount-direct --runner=PortableRunner
--job_endpoint=localhost:8099 --parallelism=1
--OPTIONALflink_master=localhost:8081 --streaming
But got the following NullPointerException error (sorry for the long text
below), any ideas? Thanks
Jun Wan
---- log starts ----
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvoker -
Invoking job
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvocation
- Starting job invocation
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkJobInvocation
- Translating pipeline to Flink program.
[flink-runner-job-server] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming
Environment.
[flink-runner-job-server] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master
URL localhost:8081.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for
field unionTag
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for
field unionTag
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for
field unionTag
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for
field unionTag
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for
field unionTag
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO
type and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for
field unionTag
[flink-runner-job-server] INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at
localhost:8081
[flink-runner-job-server] WARN org.apache.flink.configuration.Configuration -
Config uses deprecated configuration key 'jobmanager.rpc.address' instead of
proper key 'rest.address'
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest
client endpoint started.
[flink-runner-job-server] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
9a91889c469db1d88ec8f6a6d04a67b7 (detached: false).
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
Shutting down rest endpoint.
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest
endpoint shutdown complete.
[flink-runner-job-server] ERROR
org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve
the execution result.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
at
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
at
org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
at
org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
at
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
... 3 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal
server error., <Exception on server side:
java.util.concurrent.CompletionException: java.lang.NullPointerException
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1107)
at
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.jobgraph.JobGraph.writeUserArtifactEntriesToConfiguration(JobGraph.java:586)
at
org.apache.flink.runtime.client.ClientUtils.setUserArtifactBlobKeys(ClientUtils.java:140)
at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserArtifacts(ClientUtils.java:121)
at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:78)
at
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:168)
at
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
... 6 more
End of exception on server side>]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:349)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:333)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
---- log ends ----
On 2018/11/15 11:01:28, Maximilian Michels <[email protected]> wrote:
> Hi Ruoyun,
>
> The output file will be within the container which is deleted after
> shutdown by default. You can keep the containers if you add the flag
>
> --retain_docker_containers
>
> Note, this is from ManualDockerEnvironmentOptions.
>
> The problem with batch is that it executes staged and will create
> multiple containers [1] which don't share the same local file system. So
> the wordcount only works reliably if you use a distributed file system.
>
> Cheers,
> Max
>
> [1] You can prevent multiple containers by using
> --environment_cache_millis=10000
>
> On 14.11.18 20:44, Ruoyun Huang wrote:
> > Thanks Thomas!
> >
> > My desktop runs Linux. I was using gradle to run wordcount, and that
> > was how I got the job hanging. Since both of you get it working, I guess
> > more likely sth is wrong with my setup.
> >
> >
> > By using Thmoas's python command line exactly as is, I am able to see
> > the job run succeeds, however two questions:
> >
> > 1) Did you check whether output file "/tmp/py-wordcount-direct" exists
> > or not? I expect there should be a text output, but I don't see this
> > file afterwards. (I am still in the stage building confidence in
> > telling what a succeeded run is. Maybe I will try DataflowRunner and
> > cross check outputs).
> >
> > 2) Why it needs a "--streaming" arg? Isn't this a static batch input,
> > by feeding a txt file input? In fact, I got failure message if I remove
> > '--streaming', not sure if it is due to my setup again.
> >
> >
> > On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Works for me on macOS as well.
> >
> > In case you don't launch the pipeline through Gradle, this would be
> > the command:
> >
> > python -m apache_beam.examples.wordcount \
> > --input=/etc/profile \
> > --output=/tmp/py-wordcount-direct \
> > --runner=PortableRunner \
> > --job_endpoint=localhost:8099 \
> > --parallelism=1 \
> > --OPTIONALflink_master=localhost:8081 \
> > --streaming
> >
> > We talked about adding the wordcount to pre-commit..
> >
> > Regarding using ULR vs. Flink runner: There seems to be confusion
> > between PortableRunner using the user supplied endpoint vs. trying
> > to launch a job server. I commented in the doc.
> >
> > Thomas
> >
> >
> >
> > On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Hi Ruoyun,
> >
> > I just ran the wordcount locally using the instructions on the
> > page.
> > I've tried the local file system and GCS. Both times it ran
> > successfully
> > and produced valid output.
> >
> > I'm assuming there is some problem with your setup. Which
> > platform are
> > you using? I'm on MacOS.
> >
> > Could you expand on the planned merge? From my understanding we
> > will
> > always need PortableRunner in Python to be able to submit
> > against the
> > Beam JobServer.
> >
> > Thanks,
> > Max
> >
> > On 14.11.18 00:39, Ruoyun Huang wrote:
> > > A quick follow-up on using current PortableRunner.
> > >
> > > I followed the exact three steps as Ankur and Maximilian
> > shared in
> > > https://beam.apache.org/roadmap/portability/#python-on-flink
> > ; The
> > > wordcount example keeps hanging after 10 minutes. I also tried
> > > specifying explicit input/output args, either using gcs
> > folder or local
> > > file system, but none of them works.
> > >
> > > Spent some time looking into it but conclusion yet. At this
> > point
> > > though, I guess it does not matter much any more, given we
> > already have
> > > the plan of merging PortableRunner into using java reference
> > runner
> > > (i.e. :beam-runners-reference-job-server).
> > >
> > > Still appreciated if someone can try out the python-on-flink
> > >
> >
> > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> >
> > > in case it is just due to my local machine setup. Thanks!
> > >
> > >
> > >
> > > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > Thanks Maximilian!
> > >
> > > I am working on migrating existing PortableRunner to
> > using java ULR
> > > (Link to Notes
> > >
> >
> > <https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#>).
> > > If this issue is non-trivial to solve, I would vote for
> > removing
> > > this default behavior as part of the consolidation.
> > >
> > > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > In the long run, we should get rid of the
> > Docker-inside-Docker
> > > approach,
> > > which was only intended for testing anyways. It would
> > be cleaner to
> > > start the SDK harness container alongside with
> > JobServer container.
> > >
> > > Short term, I think it should be easy to either fix the
> > > permissions of
> > > the mounted "docker" executable or use a Docker image
> > for the
> > > JobServer
> > > which comes with Docker pre-installed.
> > >
> > > JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> > >
> > > Thanks for reporting this Ruoyun!
> > >
> > > -Max
> > >
> > > On 08.11.18 00:10, Ruoyun Huang wrote:
> > > > Thanks Ankur and Maximilian.
> > > >
> > > > Just for reference in case other people
> > encountering the same
> > > error
> > > > message, the "permission denied" error in my
> > original email
> > > is exactly
> > > > due to dockerinsidedocker issue that Ankur mentioned.
> > > Thanks Ankur!
> > > > Didn't make the link when you said it, had to
> > discover that
> > > in a hard
> > > > way (I thought it is due to my docker installation
> > messed up).
> > > >
> > > > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > >
> > > > Hi,
> > > >
> > > > Please follow
> > > >
> > https://beam.apache.org/roadmap/portability/#python-on-flink
> > > >
> > > > Cheers,
> > > > Max
> > > >
> > > > On 06.11.18 01:14, Ankur Goenka wrote:
> > > > > Hi,
> > > > >
> > > > > The Portable Runner requires a job server
> > uri to work
> > > with. The
> > > > current
> > > > > default job server docker image is broken
> > because of
> > > docker inside
> > > > > docker issue.
> > > > >
> > > > > Please refer to
> > > > >
> > > https://beam.apache.org/roadmap/portability/#python-on-flink for
> > > > how to
> > > > > run a wordcount using Portable Flink Runner.
> > > > >
> > > > > Thanks,
> > > > > Ankur
> > > > >
> > > > > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>>
> > > > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > > >
> > > > > Hi, Folks,
> > > > >
> > > > > I want to try out Python
> > PortableRunner, by
> > > using following
> > > > > command:
> > > > >
> > > > > *sdk/python: python -m
> > apache_beam.examples.wordcount
> > > > > --output=/tmp/test_output --runner
> > PortableRunner*
> > > > >
> > > > > It complains with following error
> > message:
> > > > >
> > > > > Caused by: java.lang.Exception: The
> > user defined
> > > 'open()' method
> > > > > caused an exception:
> > java.io.IOException: Cannot
> > > run program
> > > > > "docker": error=13, Permission denied
> > > > > at
> > > >
> > >
> >
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > > > > at
> > > > >
> > > >
> > >
> >
> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > > > > at
> > >
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > > > > ... 1 more
> > > > > Caused by:
> > > > >
> > > >
> > >
> >
> > org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > > java.io.IOException: Cannot run program
> > "docker":
> > > error=13,
> > > > > Permission denied
> > > > > at
> > > > >
> > > >
> > >
> >
> > org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> > > > >
> > > > > ... 7 more
> > > > >
> > > > >
> > > > >
> > > > > My py2 environment is properly
> > configured, because
> > > DirectRunner
> > > > > works. Also I tested my docker
> > installation by
> > > 'docker run
> > > > > hello-world ', no issue.
> > > > >
> > > > >
> > > > > Thanks.
> > > > > --
> > > > > ================
> > > > > Ruoyun Huang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > ================
> > > > Ruoyun Huang
> > > >
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun Huang
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun Huang
> > >
> >
> >
> >
> > --
> > ================
> > Ruoyun Huang
> >
>