The flag is automatically set, but not in a smart way. Taking another look
at the code, a more resilient fix would be to just check if the runner
isinstance of PortableRunner.

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com> wrote:

> Is not this flag set automatically for the portable runner here [1] ?
>
> [1]
> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
>
> On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <t...@apache.org> wrote:
>> >
>> > +1 for making --experiments=beam_fn_api default.
>> >
>> > Can the Dataflow runner driver just remove the setting if it is not
>> compatible?
>>
>> The tricky bit would be undoing the differences in graph construction
>> due to this flag flip. But I would be in favor of changing the default
>> (probably just removing the flag) and moving the non-portability parts
>> into the dataflow runner itself. (It looks like the key differences
>> here are for the Create and Read transforms.)
>>
>> > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels <m...@apache.org>
>> wrote:
>> >>
>> >> +dev
>> >>
>> >> The beam_fn_api flag and the way it is automatically set is
>> error-prone.
>> >> Is there anything that prevents us from removing it? I understand that
>> >> some Runners, e.g. Dataflow Runner have two modes of executing Python
>> >> pipelines (legacy and portable), but at this point it seems clear that
>> >> the portability mode should be the default.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
>> >> <yu.w.ten...@gmail.com> wrote:
>> >>
>> >>     Kyle
>> >>
>> >>     Thank you for the assistance.
>> >>
>> >>     By specifying "experiments" in PipelineOptions ,
>> >>     ==========================================
>> >>              options = PipelineOptions([
>> >>                            "--runner=FlinkRunner",
>> >>                            "--flink_version=1.8",
>> >>                            "--flink_master_url=localhost:8081",
>> >>                            "--experiments=beam_fn_api"
>> >>                        ])
>> >>     ==========================================
>> >>
>> >>     I was able to submit the job successfully.
>> >>
>> >>     [grpc-default-executor-0] INFO
>> >>     org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
>> >>
>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> >>     [grpc-default-executor-0] INFO
>> >>     org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
>> >>     Starting job invocation
>> >>
>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
>> >>     pipeline to Flink program.
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
>> >>     a Batch Execution Environment.
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
>> >>     Flink Master URL localhost:8081.
>> >>     [flink-runner-job-invoker] WARN
>> >>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
>> >>     default parallelism could be found. Defaulting to parallelism 1.
>> >>     Please set an explicit parallelism with --parallelism
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.flink.api.java.ExecutionEnvironment - The job has 0
>> >>     registered types and 0 default Kryo serializers
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.flink.configuration.Configuration - Config uses fallback
>> >>     configuration key 'jobmanager.rpc.address' instead of key
>> 'rest.address'
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.flink.runtime.rest.RestClient - Rest client endpoint
>> started.
>> >>     [flink-runner-job-invoker] INFO
>> >>     org.apache.flink.client.program.rest.RestClusterClient - Submitting
>> >>     job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
>> >>
>> >>     Thanks,
>> >>     Yu Watanabe
>> >>
>> >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com
>> >>     <mailto:kcwea...@google.com>> wrote:
>> >>
>> >>         Try adding "--experiments=beam_fn_api" to your pipeline
>> options.
>> >>         (This is a known issue with Beam 2.15 that will be fixed in
>> 2.16.)
>> >>
>> >>         Kyle Weaver | Software Engineer | github.com/ibzib
>> >>         <http://github.com/ibzib> | kcwea...@google.com
>> >>         <mailto:kcwea...@google.com>
>> >>
>> >>
>> >>         On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
>> >>         <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> wrote:
>> >>
>> >>             Hello.
>> >>
>> >>             I am trying to spin up the flink runner but looks like data
>> >>             serialization is failing.
>> >>             I would like to ask for help to get over with this error.
>> >>
>> >>
>>  ========================================================================
>> >>             [flink-runner-job-invoker] ERROR
>> >>
>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
>> >>             - Error during job invocation
>> >>
>>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>> >>             java.lang.IllegalArgumentException: unable to deserialize
>> >>             BoundedSource
>> >>                      at
>> >>
>>  
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>> >>                      at
>> >>
>>  
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>> >>                      at
>> >>
>>  
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>> >>                      at
>> >>
>>  
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>> >>                      at
>> >>
>>  
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>> >>                      at
>> >>
>>  
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
>> >>             ywatanabe@debian-09-00:~$
>> >>                      at
>> >>
>>  
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >>                      at java.lang.Thread.run(Thread.java:748)
>> >>             Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>> >>                      at
>> >>
>>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>> >>                      at
>> >>             org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>> >>                      at
>> >>             org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>> >>                      at
>> org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>> >>                      at
>> >>
>>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>> >>                      at
>> >>
>>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>> >>                      at
>> >>
>>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>> >>                      at
>> >>
>>  
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>> >>                      ... 13 more
>> >>
>>  ========================================================================
>> >>
>> >>             My beam version is below.
>> >>
>> >>
>>  =======================================================================
>> >>             (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep
>> >>             apache-beam
>> >>             apache-beam==2.15.0
>> >>
>>  =======================================================================
>> >>
>> >>             I have my harness container ready on  the registry.
>> >>
>> >>
>>  =======================================================================
>> >>             ywatanabe@debian-09-00:~$ docker search
>> >>             ywatanabe-docker-apache.bintray.io/python3
>> >>             <http://ywatanabe-docker-apache.bintray.io/python3>
>> >>             NAME                DESCRIPTION         STARS
>> >>             OFFICIAL            AUTOMATED
>> >>             beam/python3                            0
>> >>
>>  =======================================================================
>> >>
>> >>             Flink is ready on separate cluster.
>> >>
>> >>
>>  =======================================================================
>> >>             (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
>> >>             tcp    LISTEN     0      128      :::8081         :::*
>> >>
>>  =======================================================================
>> >>
>> >>
>> >>             My debian version.
>> >>
>> >>
>>  =======================================================================
>> >>
>> >>             (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
>> >>             9.11
>> >>
>>  =======================================================================
>> >>
>> >>
>> >>             My code snippet is below.
>> >>
>> >>
>>  =======================================================================
>> >>
>> >>                  options = PipelineOptions([
>> >>                                "--runner=FlinkRunner",
>> >>                                "--flink_version=1.8",
>> >>             "--flink_master_url=localhost:8081"
>> >>                            ])
>> >>
>> >>                  with beam.Pipeline(options=options) as p:
>> >>
>> >>                      (p | beam.Create(["Hello World"]))
>> >>
>>  =======================================================================
>> >>
>> >>
>> >>             Would there be any other settings should I look for ?
>> >>
>> >>             Thanks,
>> >>             Yu Watanabe
>> >>
>> >>             --
>> >>             Yu Watanabe
>> >>             Weekend Freelancer who loves to challenge building data
>> >>             platform
>> >>             yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>> >>             LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1>
>> >>             Twitter icon <https://twitter.com/yuwtennis>
>> >>
>> >>
>> >>
>> >>     --
>> >>     Yu Watanabe
>> >>     Weekend Freelancer who loves to challenge building data platform
>> >>     yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>> >>     LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter
>> icon
>> >>     <https://twitter.com/yuwtennis>
>> >>
>>
>

Reply via email to