Could you make that change and see if it would have addressed the issue here?
On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com> wrote: > The flag is automatically set, but not in a smart way. Taking another look > at the code, a more resilient fix would be to just check if the runner > isinstance of PortableRunner. > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com> wrote: > >> Is not this flag set automatically for the portable runner here [1] ? >> >> [1] >> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160 >> >> On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <t...@apache.org> wrote: >>> > >>> > +1 for making --experiments=beam_fn_api default. >>> > >>> > Can the Dataflow runner driver just remove the setting if it is not >>> compatible? >>> >>> The tricky bit would be undoing the differences in graph construction >>> due to this flag flip. But I would be in favor of changing the default >>> (probably just removing the flag) and moving the non-portability parts >>> into the dataflow runner itself. (It looks like the key differences >>> here are for the Create and Read transforms.) >>> >>> > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >> >>> >> +dev >>> >> >>> >> The beam_fn_api flag and the way it is automatically set is >>> error-prone. >>> >> Is there anything that prevents us from removing it? I understand that >>> >> some Runners, e.g. Dataflow Runner have two modes of executing Python >>> >> pipelines (legacy and portable), but at this point it seems clear that >>> >> the portability mode should be the default. >>> >> >>> >> Cheers, >>> >> Max >>> >> >>> >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe >>> >> <yu.w.ten...@gmail.com> wrote: >>> >> >>> >> Kyle >>> >> >>> >> Thank you for the assistance. >>> >> >>> >> By specifying "experiments" in PipelineOptions , >>> >> ========================================== >>> >> options = PipelineOptions([ >>> >> "--runner=FlinkRunner", >>> >> "--flink_version=1.8", >>> >> "--flink_master_url=localhost:8081", >>> >> "--experiments=beam_fn_api" >>> >> ]) >>> >> ========================================== >>> >> >>> >> I was able to submit the job successfully. >>> >> >>> >> [grpc-default-executor-0] INFO >>> >> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job >>> >> >>> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 >>> >> [grpc-default-executor-0] INFO >>> >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - >>> >> Starting job invocation >>> >> >>> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating >>> >> pipeline to Flink program. >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - >>> Creating >>> >> a Batch Execution Environment. >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using >>> >> Flink Master URL localhost:8081. >>> >> [flink-runner-job-invoker] WARN >>> >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - No >>> >> default parallelism could be found. Defaulting to parallelism 1. >>> >> Please set an explicit parallelism with --parallelism >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.flink.api.java.ExecutionEnvironment - The job has 0 >>> >> registered types and 0 default Kryo serializers >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.flink.configuration.Configuration - Config uses >>> fallback >>> >> configuration key 'jobmanager.rpc.address' instead of key >>> 'rest.address' >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.flink.runtime.rest.RestClient - Rest client endpoint >>> started. >>> >> [flink-runner-job-invoker] INFO >>> >> org.apache.flink.client.program.rest.RestClusterClient - >>> Submitting >>> >> job 4e055a8878dda3f564a7b7c84d48510d (detached: false). >>> >> >>> >> Thanks, >>> >> Yu Watanabe >>> >> >>> >> On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com >>> >> <mailto:kcwea...@google.com>> wrote: >>> >> >>> >> Try adding "--experiments=beam_fn_api" to your pipeline >>> options. >>> >> (This is a known issue with Beam 2.15 that will be fixed in >>> 2.16.) >>> >> >>> >> Kyle Weaver | Software Engineer | github.com/ibzib >>> >> <http://github.com/ibzib> | kcwea...@google.com >>> >> <mailto:kcwea...@google.com> >>> >> >>> >> >>> >> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe >>> >> <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> wrote: >>> >> >>> >> Hello. >>> >> >>> >> I am trying to spin up the flink runner but looks like >>> data >>> >> serialization is failing. >>> >> I would like to ask for help to get over with this error. >>> >> >>> >> >>> ======================================================================== >>> >> [flink-runner-job-invoker] ERROR >>> >> >>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation >>> >> - Error during job invocation >>> >> >>> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. >>> >> java.lang.IllegalArgumentException: unable to deserialize >>> >> BoundedSource >>> >> at >>> >> >>> >>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >>> >> at >>> >> >>> >>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) >>> >> at >>> >> >>> >>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) >>> >> at >>> >> >>> >>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) >>> >> at >>> >> >>> >>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) >>> >> at >>> >> >>> >>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) >>> >> at >>> >> >>> >>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) >>> >> at >>> >> >>> >>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) >>> >> at >>> >> >>> >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) >>> >> at >>> >> >>> >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) >>> >> at >>> >> >>> >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) >>> >> at >>> >> >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) >>> >> ywatanabe@debian-09-00:~$ >>> >> at >>> >> >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> >> at java.lang.Thread.run(Thread.java:748) >>> >> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) >>> >> at >>> >> >>> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) >>> >> at >>> >> org.xerial.snappy.SnappyNative.rawUncompress(Native >>> Method) >>> >> at >>> >> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) >>> >> at >>> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) >>> >> at >>> >> >>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) >>> >> at >>> >> >>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) >>> >> at >>> >> >>> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) >>> >> at >>> >> >>> >>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) >>> >> ... 13 more >>> >> >>> ======================================================================== >>> >> >>> >> My beam version is below. >>> >> >>> >> >>> ======================================================================= >>> >> (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep >>> >> apache-beam >>> >> apache-beam==2.15.0 >>> >> >>> ======================================================================= >>> >> >>> >> I have my harness container ready on the registry. >>> >> >>> >> >>> ======================================================================= >>> >> ywatanabe@debian-09-00:~$ docker search >>> >> ywatanabe-docker-apache.bintray.io/python3 >>> >> <http://ywatanabe-docker-apache.bintray.io/python3> >>> >> NAME DESCRIPTION STARS >>> >> OFFICIAL AUTOMATED >>> >> beam/python3 0 >>> >> >>> ======================================================================= >>> >> >>> >> Flink is ready on separate cluster. >>> >> >>> >> >>> ======================================================================= >>> >> (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081 >>> >> tcp LISTEN 0 128 :::8081 :::* >>> >> >>> ======================================================================= >>> >> >>> >> >>> >> My debian version. >>> >> >>> >> >>> ======================================================================= >>> >> >>> >> (python) ywatanabe@debian-09-00:~$ cat >>> /etc/debian_version >>> >> 9.11 >>> >> >>> ======================================================================= >>> >> >>> >> >>> >> My code snippet is below. >>> >> >>> >> >>> ======================================================================= >>> >> >>> >> options = PipelineOptions([ >>> >> "--runner=FlinkRunner", >>> >> "--flink_version=1.8", >>> >> "--flink_master_url=localhost:8081" >>> >> ]) >>> >> >>> >> with beam.Pipeline(options=options) as p: >>> >> >>> >> (p | beam.Create(["Hello World"])) >>> >> >>> ======================================================================= >>> >> >>> >> >>> >> Would there be any other settings should I look for ? >>> >> >>> >> Thanks, >>> >> Yu Watanabe >>> >> >>> >> -- >>> >> Yu Watanabe >>> >> Weekend Freelancer who loves to challenge building data >>> >> platform >>> >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com> >>> >> LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> >>> >> Twitter icon <https://twitter.com/yuwtennis> >>> >> >>> >> >>> >> >>> >> -- >>> >> Yu Watanabe >>> >> Weekend Freelancer who loves to challenge building data platform >>> >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com> >>> >> LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter >>> icon >>> >> <https://twitter.com/yuwtennis> >>> >> >>> >>