Try adding "--experiments=beam_fn_api" to your pipeline options. (This is a known issue with Beam 2.15 that will be fixed in 2.16.)
Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe <yu.w.ten...@gmail.com> wrote: > Hello. > > I am trying to spin up the flink runner but looks like data serialization > is failing. > I would like to ask for help to get over with this error. > > ======================================================================== > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation > BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. > java.lang.IllegalArgumentException: unable to deserialize BoundedSource > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) > at > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) > at > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) > at > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) > ywatanabe@debian-09-00:~$ > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) > at > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) > at > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) > at > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) > ... 13 more > ======================================================================== > > My beam version is below. > > ======================================================================= > (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep apache-beam > apache-beam==2.15.0 > ======================================================================= > > I have my harness container ready on the registry. > > ======================================================================= > ywatanabe@debian-09-00:~$ docker search > ywatanabe-docker-apache.bintray.io/python3 > NAME DESCRIPTION STARS OFFICIAL > AUTOMATED > beam/python3 0 > ======================================================================= > > Flink is ready on separate cluster. > > ======================================================================= > (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081 > tcp LISTEN 0 128 :::8081 :::* > ======================================================================= > > My debian version. > > ======================================================================= > (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version > 9.11 > ======================================================================= > > My code snippet is below. > > ======================================================================= > options = PipelineOptions([ > "--runner=FlinkRunner", > "--flink_version=1.8", > "--flink_master_url=localhost:8081" > ]) > > with beam.Pipeline(options=options) as p: > > (p | beam.Create(["Hello World"])) > ======================================================================= > > Would there be any other settings should I look for ? > > Thanks, > Yu Watanabe > > -- > Yu Watanabe > Weekend Freelancer who loves to challenge building data platform > yu.w.ten...@gmail.com > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: > Twitter icon] <https://twitter.com/yuwtennis> >