I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further, but
still fails, here is the latest error from Flink. Thanks!
the job cmd I launched : python -m apache_beam.examples.wordcount
--input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner
--job_endpoint=localhost:8099 --parallelism=1
--OPTIONALflink_master=localhost:8081 --streaming
--experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED
--experiments=beam_fn_api
Jun
---- log starts ----
[flink-runner-job-server] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at
localhost:8081
[flink-runner-job-server] WARN org.apache.flink.configuration.Configuration -
Config uses deprecated configuration key 'jobmanager.rpc.address' instead of
proper key 'rest.address'
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest
client endpoint started.
[flink-runner-job-server] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
Shutting down rest endpoint.
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest
endpoint shutdown complete.
[flink-runner-job-server] ERROR
org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation
BeamApp-jwan-0121211115-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
org.apache.flink.client.program.ProgramInvocationException: Job
4ecb5e5cfd4718de440f48cbfaf7216a failed.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
at
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
at
org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
at
org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
at
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
... 12 more
Caused by: java.lang.RuntimeException: Exception occurred while processing
valve output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
... 1 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:694)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.emitWatermark(DoFnOperator.java:591)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:581)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:540)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:626)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:677)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.processWatermark(ExecutableStageDoFnOperator.java:471)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
... 12 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error received from SDK harness for instruction 21: Traceback (most recent call
last):
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 148, in _execute
response = task()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 183, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 256, in do_instruction
request.instruction_id)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 270, in process_bundle
request.process_bundle_descriptor_reference) as bundle_processor:
File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
return self.gen.next()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 292, in get_bundle_processor
self.data_channel_factory)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 404, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 448, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 381, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 431, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 430, in <dictcomp>
for tag, pcoll_id
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 381, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 434, in get_operation
transform_id, transform_consumers)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 584, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 768, in create
serialized_fn, parameter)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line
247, in loads
return dill.loads(s)
File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in
loads
return load(file, ignore)
File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in load
obj = pik.load()
File "/usr/local/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build
d = inst.__dict__
AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no
attribute '__dict__'
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:623)
... 17 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 21: Traceback (most recent call last):
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 148, in _execute
response = task()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 183, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 256, in do_instruction
request.instruction_id)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 270, in process_bundle
request.process_bundle_descriptor_reference) as bundle_processor:
File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
return self.gen.next()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 292, in get_bundle_processor
self.data_channel_factory)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 404, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 448, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 381, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 431, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 430, in <dictcomp>
for tag, pcoll_id
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 381, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 434, in get_operation
transform_id, transform_consumers)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 584, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 768, in create
serialized_fn, parameter)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line
247, in loads
return dill.loads(s)
File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in
loads
return load(file, ignore)
File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in load
obj = pik.load()
File "/usr/local/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build
d = inst.__dict__
AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no
attribute '__dict__'
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
---- log ends ----
On 2018/11/14 19:57:19, Ruoyun Huang <[email protected]> wrote:
> To answer Maximilian's question.
>
> I am using Linux, debian distribution.
>
> It probably sounded too much when I used the word 'planned merge'. What I
> really meant entails less change than it sounds. More specifically:
>
> 1) The default behavior, where PortableRunner starts a flink server. It is
> confusing to new users.
> 2) All the related docs and inline comments. Similarly, it could be very
> confusing connecting PortableRunner to Flink server.
> 3) [Probably no longer an issue]. I couldn't make the flink server
> example working. And I could not make example working on Java-ULR either.
> Both will require debugging for resolutions. Thus I figured maybe let us
> only focus on one single thing: the java-ULR part, without worrying about
> Flink-server. Again, looks like this may not be a valid concern, given
> flink part is most likely due to my setup.
>
>
> On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <[email protected]> wrote:
>
> > Hi Ruoyun,
> >
> > I just ran the wordcount locally using the instructions on the page.
> > I've tried the local file system and GCS. Both times it ran successfully
> > and produced valid output.
> >
> > I'm assuming there is some problem with your setup. Which platform are
> > you using? I'm on MacOS.
> >
> > Could you expand on the planned merge? From my understanding we will
> > always need PortableRunner in Python to be able to submit against the
> > Beam JobServer.
> >
> > Thanks,
> > Max
> >
> > On 14.11.18 00:39, Ruoyun Huang wrote:
> > > A quick follow-up on using current PortableRunner.
> > >
> > > I followed the exact three steps as Ankur and Maximilian shared in
> > > https://beam.apache.org/roadmap/portability/#python-on-flink ; The
> > > wordcount example keeps hanging after 10 minutes. I also tried
> > > specifying explicit input/output args, either using gcs folder or local
> > > file system, but none of them works.
> > >
> > > Spent some time looking into it but conclusion yet. At this point
> > > though, I guess it does not matter much any more, given we already have
> > > the plan of merging PortableRunner into using java reference runner
> > > (i.e. :beam-runners-reference-job-server).
> > >
> > > Still appreciated if someone can try out the python-on-flink
> > > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions
> >
> > > in case it is just due to my local machine setup. Thanks!
> > >
> > >
> > >
> > > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <[email protected]
> > > <mailto:[email protected]>> wrote:
> > >
> > > Thanks Maximilian!
> > >
> > > I am working on migrating existing PortableRunner to using java ULR
> > > (Link to Notes
> > > <
> > https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> > >).
> > > If this issue is non-trivial to solve, I would vote for removing
> > > this default behavior as part of the consolidation.
> > >
> > > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <[email protected]
> > > <mailto:[email protected]>> wrote:
> > >
> > > In the long run, we should get rid of the Docker-inside-Docker
> > > approach,
> > > which was only intended for testing anyways. It would be cleaner
> > to
> > > start the SDK harness container alongside with JobServer
> > container.
> > >
> > > Short term, I think it should be easy to either fix the
> > > permissions of
> > > the mounted "docker" executable or use a Docker image for the
> > > JobServer
> > > which comes with Docker pre-installed.
> > >
> > > JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> > >
> > > Thanks for reporting this Ruoyun!
> > >
> > > -Max
> > >
> > > On 08.11.18 00:10, Ruoyun Huang wrote:
> > > > Thanks Ankur and Maximilian.
> > > >
> > > > Just for reference in case other people encountering the same
> > > error
> > > > message, the "permission denied" error in my original email
> > > is exactly
> > > > due to dockerinsidedocker issue that Ankur mentioned.
> > > Thanks Ankur!
> > > > Didn't make the link when you said it, had to discover that
> > > in a hard
> > > > way (I thought it is due to my docker installation messed up).
> > > >
> > > > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > > <[email protected] <mailto:[email protected]>
> > > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > > >
> > > > Hi,
> > > >
> > > > Please follow
> > > > https://beam.apache.org/roadmap/portability/#python-on-flink
> > > >
> > > > Cheers,
> > > > Max
> > > >
> > > > On 06.11.18 01:14, Ankur Goenka wrote:
> > > > > Hi,
> > > > >
> > > > > The Portable Runner requires a job server uri to work
> > > with. The
> > > > current
> > > > > default job server docker image is broken because of
> > > docker inside
> > > > > docker issue.
> > > > >
> > > > > Please refer to
> > > > >
> > > https://beam.apache.org/roadmap/portability/#python-on-flink for
> > > > how to
> > > > > run a wordcount using Portable Flink Runner.
> > > > >
> > > > > Thanks,
> > > > > Ankur
> > > > >
> > > > > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
> > > <[email protected] <mailto:[email protected]>
> > > > <mailto:[email protected] <mailto:[email protected]>>
> > > > > <mailto:[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > > >
> > > > > Hi, Folks,
> > > > >
> > > > > I want to try out Python PortableRunner, by
> > > using following
> > > > > command:
> > > > >
> > > > > *sdk/python: python -m
> > apache_beam.examples.wordcount
> > > > > --output=/tmp/test_output --runner
> > PortableRunner*
> > > > >
> > > > > It complains with following error message:
> > > > >
> > > > > Caused by: java.lang.Exception: The user defined
> > > 'open()' method
> > > > > caused an exception: java.io.IOException: Cannot
> > > run program
> > > > > "docker": error=13, Permission denied
> > > > > at
> > > >
> > >
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > > > > at
> > > > >
> > > >
> > >
> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > > > > at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > > > > ... 1 more
> > > > > Caused by:
> > > > >
> > > >
> > >
> >
> > org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > > java.io.IOException: Cannot run program "docker":
> > > error=13,
> > > > > Permission denied
> > > > > at
> > > > >
> > > >
> > >
> >
> > org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> > > > >
> > > > > ... 7 more
> > > > >
> > > > >
> > > > >
> > > > > My py2 environment is properly configured, because
> > > DirectRunner
> > > > > works. Also I tested my docker installation by
> > > 'docker run
> > > > > hello-world ', no issue.
> > > > >
> > > > >
> > > > > Thanks.
> > > > > --
> > > > > ================
> > > > > Ruoyun Huang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > ================
> > > > Ruoyun Huang
> > > >
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun Huang
> > >
> > >
> > >
> > > --
> > > ================
> > > Ruoyun Huang
> > >
> >
>
>
> --
> ================
> Ruoyun Huang
>