I disagree that this flag is obsolete. It is still serving a purpose for batch 
users using dataflow runner and that is decent chunk of beam python users.

It is obsolete for the PortableRunner. If the Dataflow Runner needs this flag, couldn't we simply add it there? As far as I know Dataflow users do not use the PortableRunner. I might be wrong.

As Kyle mentioned, he already fixed the issue. The fix is only present in the 2.16.0 release though. This flag has repeatedly caused friction for users and that's why I want to get rid of it.

There is of course no need to rush this but it would be great to tackle this for the next release. Filed a JIRA: https://jira.apache.org/jira/browse/BEAM-8274

Cheers,
Max

On 17.09.19 15:39, Kyle Weaver wrote:
Actually, the reported issues are already fixed on head. We're just trying to prevent similar issues in the future.

Kyle Weaver | Software Engineer | github.com/ibzib <http://github.com/ibzib> | kcwea...@google.com <mailto:kcwea...@google.com>


On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com <mailto:al...@google.com>> wrote:



    On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org
    <mailto:m...@apache.org>> wrote:

         > Is not this flag set automatically for the portable runner

        Yes, the flag is set automatically, but it has been broken
        before and
        likely will be again. It just adds additional complexity to
        portable
        Runners. There is no other portability API then the Fn API. This
        flag
        historically had its justification, but seems obsolete now.


    I disagree that this flag is obsolete. It is still serving a purpose
    for batch users using dataflow runner and that is decent chunk of
    beam python users.

    I agree with switching the default. I would like to give enough time
    to decouple the flag from the core code. (With a quick search I saw
    two instances related to Read and Create.) Have time to test changes
    and then switch the default.


        An isinstance check might be smarter, but does not get rid of
        the root
        of the problem.


    I might be wrong, IIUC, it will temporarily resolve the reported
    issues. Is this not accurate?


        -Max

        On 17.09.19 14:20, Ahmet Altay wrote:
         > Could you make that change and see if it would have addressed
        the issue
         > here?
         >
         > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
        <kcwea...@google.com <mailto:kcwea...@google.com>
         > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>> wrote:
         >
         >     The flag is automatically set, but not in a smart way. Taking
         >     another look at the code, a more resilient fix would be
        to just
         >     check if the runner isinstance of PortableRunner.
         >
         >     Kyle Weaver | Software Engineer | github.com/ibzib
        <http://github.com/ibzib>
         >     <http://github.com/ibzib> | kcwea...@google.com
        <mailto:kcwea...@google.com>
         >     <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
         >
         >
         >     On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
        <al...@google.com <mailto:al...@google.com>
         >     <mailto:al...@google.com <mailto:al...@google.com>>> wrote:
         >
         >         Is not this flag set automatically for the portable
        runner here
         >         [1] ?
         >
         >         [1]
         >
        
https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
         >
         >         On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
         >         <rober...@google.com <mailto:rober...@google.com>
        <mailto:rober...@google.com <mailto:rober...@google.com>>> wrote:
         >
         >             On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
        <t...@apache.org <mailto:t...@apache.org>
         >             <mailto:t...@apache.org <mailto:t...@apache.org>>>
        wrote:
         >              >
         >              > +1 for making --experiments=beam_fn_api default.
         >              >
         >              > Can the Dataflow runner driver just remove the
        setting if
         >             it is not compatible?
         >
         >             The tricky bit would be undoing the differences
        in graph
         >             construction
         >             due to this flag flip. But I would be in favor of
        changing
         >             the default
         >             (probably just removing the flag) and moving the
         >             non-portability parts
         >             into the dataflow runner itself. (It looks like
        the key
         >             differences
         >             here are for the Create and Read transforms.)
         >
         >              > On Tue, Sep 17, 2019 at 11:33 AM Maximilian
        Michels
         >             <m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
         >              >>
         >              >> +dev
         >              >>
         >              >> The beam_fn_api flag and the way it is
        automatically set
         >             is error-prone.
         >              >> Is there anything that prevents us from
        removing it? I
         >             understand that
         >              >> some Runners, e.g. Dataflow Runner have two
        modes of
         >             executing Python
         >              >> pipelines (legacy and portable), but at this
        point it
         >             seems clear that
         >              >> the portability mode should be the default.
         >              >>
         >              >> Cheers,
         >              >> Max
         >              >>
         >              >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
         >              >> <yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>>>
         >             wrote:
         >              >>
         >              >>     Kyle
         >              >>
         >              >>     Thank you for the assistance.
         >              >>
         >              >>     By specifying "experiments" in
        PipelineOptions ,
         >              >>     ==========================================
         >              >>              options = PipelineOptions([
>              >> "--runner=FlinkRunner",
         >              >>                            "--flink_version=1.8",
         >              >>
         >             "--flink_master_url=localhost:8081",
>              >> "--experiments=beam_fn_api"
         >              >>                        ])
         >              >>     ==========================================
         >              >>
         >              >>     I was able to submit the job successfully.
         >              >>
         >              >>     [grpc-default-executor-0] INFO
>              >>  org.apache.beam.runners.flink.FlinkJobInvoker -
         >             Invoking job
         >              >>
>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
         >              >>     [grpc-default-executor-0] INFO
         >              >>
>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
         >              >>     Starting job invocation
         >              >>
>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
         >              >>     [flink-runner-job-invoker] INFO
>              >>  org.apache.beam.runners.flink.FlinkPipelineRunner -
         >             Translating
         >              >>     pipeline to Flink program.
         >              >>     [flink-runner-job-invoker] INFO
         >              >>
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments -
         >             Creating
         >              >>     a Batch Execution Environment.
         >              >>     [flink-runner-job-invoker] INFO
         >              >>
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments -
         >             Using
         >              >>     Flink Master URL localhost:8081.
         >              >>     [flink-runner-job-invoker] WARN
         >              >>
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
         >              >>     default parallelism could be found.
        Defaulting to
         >             parallelism 1.
         >              >>     Please set an explicit parallelism with
        --parallelism
         >              >>     [flink-runner-job-invoker] INFO
>              >>  org.apache.flink.api.java.ExecutionEnvironment - The
         >             job has 0
         >              >>     registered types and 0 default Kryo
        serializers
         >              >>     [flink-runner-job-invoker] INFO
>              >>  org.apache.flink.configuration.Configuration -
         >             Config uses fallback
         >              >>     configuration key
        'jobmanager.rpc.address' instead
         >             of key 'rest.address'
         >              >>     [flink-runner-job-invoker] INFO
         >              >>     org.apache.flink.runtime.rest.RestClient
        - Rest
         >             client endpoint started.
         >              >>     [flink-runner-job-invoker] INFO
         >              >>
>  org.apache.flink.client.program.rest.RestClusterClient -
         >             Submitting
         >              >>     job 4e055a8878dda3f564a7b7c84d48510d
        (detached: false).
         >              >>
         >              >>     Thanks,
         >              >>     Yu Watanabe
         >              >>
         >              >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver
         >             <kcwea...@google.com <mailto:kcwea...@google.com>
        <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
         >              >>     <mailto:kcwea...@google.com
        <mailto:kcwea...@google.com>
         >             <mailto:kcwea...@google.com
        <mailto:kcwea...@google.com>>>> wrote:
         >              >>
         >              >>         Try adding
        "--experiments=beam_fn_api" to your
         >             pipeline options.
         >              >>         (This is a known issue with Beam 2.15
        that will
         >             be fixed in 2.16.)
         >              >>
         >              >>         Kyle Weaver | Software Engineer |
         > github.com/ibzib <http://github.com/ibzib>
        <http://github.com/ibzib>
         >              >>         <http://github.com/ibzib> |
        kcwea...@google.com <mailto:kcwea...@google.com>
         >             <mailto:kcwea...@google.com
        <mailto:kcwea...@google.com>>
         >              >>         <mailto:kcwea...@google.com
        <mailto:kcwea...@google.com>
         >             <mailto:kcwea...@google.com
        <mailto:kcwea...@google.com>>>
         >              >>
         >              >>
         >              >>         On Sat, Sep 14, 2019 at 12:52 AM Yu
        Watanabe
         >              >>         <yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>
         >             <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>
         >             <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>>>> wrote:
         >              >>
         >              >>             Hello.
         >              >>
         >              >>             I am trying to spin up the flink
        runner but
         >             looks like data
         >              >>             serialization is failing.
         >              >>             I would like to ask for help to
        get over
         >             with this error.
         >              >>
         >              >>
>  ========================================================================
         >              >>             [flink-runner-job-invoker] ERROR
         >              >>
>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
         >              >>             - Error during job invocation
         >              >>
>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. >              >>  java.lang.IllegalArgumentException: unable
         >             to deserialize
         >              >>             BoundedSource
         >              >>                      at
         >              >>
>  org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
         >              >>                      at
         >              >>
>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
         >              >>                      at
         >              >>
>  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
         >              >>                      at
         >              >>
>  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
         >              >>                      at
         >              >>
>  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
         >              >>                      at
         >              >>
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
         >              >>             ywatanabe@debian-09-00:~$
         >              >>                      at
         >              >>
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         >              >>                      at
         >             java.lang.Thread.run(Thread.java:748)
         >              >>             Caused by: java.io.IOException:
         >             FAILED_TO_UNCOMPRESS(5)
         >              >>                      at
         >              >>
>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
         >              >>                      at
         >              >>
>  org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
         >              >>                      at
         >              >>
>  org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
         >              >>                      at
         >             org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
         >              >>                      at
         >              >>
>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
         >              >>                      at
         >              >>
>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
         >              >>                      at
         >              >>
>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
         >              >>                      at
         >              >>
>  org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
         >              >>                      ... 13 more
         >              >>
>  ========================================================================
         >              >>
         >              >>             My beam version is below.
         >              >>
         >              >>
>  =======================================================================
         >              >>             (python)
        ywatanabe@debian-09-00:~$ pip3
         >             freeze | grep
         >              >>             apache-beam
         >              >>             apache-beam==2.15.0
         >              >>
>  =======================================================================
         >              >>
         >              >>             I have my harness container ready
        on  the
         >             registry.
         >              >>
         >              >>
>  =======================================================================
         >              >>             ywatanabe@debian-09-00:~$ docker
        search
         >              >> ywatanabe-docker-apache.bintray.io/python3
        <http://ywatanabe-docker-apache.bintray.io/python3>
         >             <http://ywatanabe-docker-apache.bintray.io/python3>
         >              >>
         >               <http://ywatanabe-docker-apache.bintray.io/python3>
>              >>             NAME                DESCRIPTION        STARS
         >              >>             OFFICIAL            AUTOMATED
>              >>             beam/python3       0
         >              >>
>  =======================================================================
         >              >>
         >              >>             Flink is ready on separate cluster.
         >              >>
         >              >>
>  =======================================================================
         >              >>             (python)
        ywatanabe@debian-09-00:~$ ss -atunp
         >             | grep 8081
>              >>             tcp    LISTEN     0      128 :::8081
         >                   :::*
         >              >>
>  =======================================================================
         >              >>
         >              >>
         >              >>             My debian version.
         >              >>
         >              >>
>  =======================================================================
         >              >>
         >              >>             (python)
        ywatanabe@debian-09-00:~$ cat
         >             /etc/debian_version
         >              >>             9.11
         >              >>
>  =======================================================================
         >              >>
         >              >>
         >              >>             My code snippet is below.
         >              >>
         >              >>
>  =======================================================================
         >              >>
         >              >>                  options = PipelineOptions([
>              >> "--runner=FlinkRunner", >              >> "--flink_version=1.8",
         >              >>             "--flink_master_url=localhost:8081"
         >              >>                            ])
         >              >>
         >              >>                  with
        beam.Pipeline(options=options) as p:
         >              >>
         >              >>                      (p | beam.Create(["Hello
        World"]))
         >              >>
>  =======================================================================
         >              >>
         >              >>
         >              >>             Would there be any other settings
        should I
         >             look for ?
         >              >>
         >              >>             Thanks,
         >              >>             Yu Watanabe
         >              >>
         >              >>             --
         >              >>             Yu Watanabe
         >              >>             Weekend Freelancer who loves to
        challenge
         >             building data
         >              >>             platform
         >              >> yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>>
         >             <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>>>
         >              >>             LinkedIn icon
         >             <https://www.linkedin.com/in/yuwatanabe1>
         >              >>             Twitter icon
        <https://twitter.com/yuwtennis>
         >              >>
         >              >>
         >              >>
         >              >>     --
         >              >>     Yu Watanabe
         >              >>     Weekend Freelancer who loves to challenge
        building
         >             data platform
         >              >> yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>>
         >             <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
        <mailto:yu.w.ten...@gmail.com>>>
         >              >>     LinkedIn icon
         >             <https://www.linkedin.com/in/yuwatanabe1> Twitter
        icon
         >              >>     <https://twitter.com/yuwtennis>
         >              >>
         >

Reply via email to