+dev
The beam_fn_api flag and the way it is automatically set is error-prone.
Is there anything that prevents us from removing it? I understand that
some Runners, e.g. Dataflow Runner have two modes of executing Python
pipelines (legacy and portable), but at this point it seems clear that
the portability mode should be the default.
Cheers,
Max
On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
<[email protected]> wrote:
Kyle
Thank you for the assistance.
By specifying "experiments" in PipelineOptions ,
==========================================
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.8",
"--flink_master_url=localhost:8081",
"--experiments=beam_fn_api"
])
==========================================
I was able to submit the job successfully.
[grpc-default-executor-0] INFO
org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
Starting job invocation
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
pipeline to Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
a Batch Execution Environment.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
Flink Master URL localhost:8081.
[flink-runner-job-invoker] WARN
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
default parallelism could be found. Defaulting to parallelism 1.
Please set an explicit parallelism with --parallelism
[flink-runner-job-invoker] INFO
org.apache.flink.api.java.ExecutionEnvironment - The job has 0
registered types and 0 default Kryo serializers
[flink-runner-job-invoker] INFO
org.apache.flink.configuration.Configuration - Config uses fallback
configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
[flink-runner-job-invoker] INFO
org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
[flink-runner-job-invoker] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting
job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
Thanks,
Yu Watanabe
On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <[email protected]
<mailto:[email protected]>> wrote:
Try adding "--experiments=beam_fn_api" to your pipeline options.
(This is a known issue with Beam 2.15 that will be fixed in 2.16.)
Kyle Weaver | Software Engineer | github.com/ibzib
<http://github.com/ibzib> | [email protected]
<mailto:[email protected]>
On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
<[email protected] <mailto:[email protected]>> wrote:
Hello.
I am trying to spin up the flink runner but looks like data
serialization is failing.
I would like to ask for help to get over with this error.
========================================================================
[flink-runner-job-invoker] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
- Error during job invocation
BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
java.lang.IllegalArgumentException: unable to deserialize
BoundedSource
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
at
org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
ywatanabe@debian-09-00:~$
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
at
org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at
org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
at
org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
... 13 more
========================================================================
My beam version is below.
=======================================================================
(python) ywatanabe@debian-09-00:~$ pip3 freeze | grep
apache-beam
apache-beam==2.15.0
=======================================================================
I have my harness container ready on the registry.
=======================================================================
ywatanabe@debian-09-00:~$ docker search
ywatanabe-docker-apache.bintray.io/python3
<http://ywatanabe-docker-apache.bintray.io/python3>
NAME DESCRIPTION STARS
OFFICIAL AUTOMATED
beam/python3 0
=======================================================================
Flink is ready on separate cluster.
=======================================================================
(python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
tcp LISTEN 0 128 :::8081 :::*
=======================================================================
My debian version.
=======================================================================
(python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
9.11
=======================================================================
My code snippet is below.
=======================================================================
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.8",
"--flink_master_url=localhost:8081"
])
with beam.Pipeline(options=options) as p:
(p | beam.Create(["Hello World"]))
=======================================================================
Would there be any other settings should I look for ?
Thanks,
Yu Watanabe
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data
platform
[email protected] <mailto:[email protected]>
LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1>
Twitter icon <https://twitter.com/yuwtennis>
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected] <mailto:[email protected]>
LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
<https://twitter.com/yuwtennis>