I believe the flag was never relevant for PortableRunner. I might be wrong
as well. The flag affects a few bits in the core code and that is why the
solution cannot be by just setting the flag in Dataflow runner. It requires
some amount of clean up. I agree that it would be good to clean this up,
and I also agree to not rush this especially if this is not currently
impacting users.

Ahmet

On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels <m...@apache.org> wrote:

> > I disagree that this flag is obsolete. It is still serving a purpose for
> batch users using dataflow runner and that is decent chunk of beam python
> users.
>
> It is obsolete for the PortableRunner. If the Dataflow Runner needs this
> flag, couldn't we simply add it there? As far as I know Dataflow users
> do not use the PortableRunner. I might be wrong.
>
> As Kyle mentioned, he already fixed the issue. The fix is only present
> in the 2.16.0 release though. This flag has repeatedly caused friction
> for users and that's why I want to get rid of it.
>
> There is of course no need to rush this but it would be great to tackle
> this for the next release. Filed a JIRA:
> https://jira.apache.org/jira/browse/BEAM-8274
>
> Cheers,
> Max
>
> On 17.09.19 15:39, Kyle Weaver wrote:
> > Actually, the reported issues are already fixed on head. We're just
> > trying to prevent similar issues in the future.
> >
> > Kyle Weaver | Software Engineer | github.com/ibzib
> > <http://github.com/ibzib> | kcwea...@google.com <mailto:
> kcwea...@google.com>
> >
> >
> > On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com
> > <mailto:al...@google.com>> wrote:
> >
> >
> >
> >     On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org
> >     <mailto:m...@apache.org>> wrote:
> >
> >          > Is not this flag set automatically for the portable runner
> >
> >         Yes, the flag is set automatically, but it has been broken
> >         before and
> >         likely will be again. It just adds additional complexity to
> >         portable
> >         Runners. There is no other portability API then the Fn API. This
> >         flag
> >         historically had its justification, but seems obsolete now.
> >
> >
> >     I disagree that this flag is obsolete. It is still serving a purpose
> >     for batch users using dataflow runner and that is decent chunk of
> >     beam python users.
> >
> >     I agree with switching the default. I would like to give enough time
> >     to decouple the flag from the core code. (With a quick search I saw
> >     two instances related to Read and Create.) Have time to test changes
> >     and then switch the default.
> >
> >
> >         An isinstance check might be smarter, but does not get rid of
> >         the root
> >         of the problem.
> >
> >
> >     I might be wrong, IIUC, it will temporarily resolve the reported
> >     issues. Is this not accurate?
> >
> >
> >         -Max
> >
> >         On 17.09.19 14:20, Ahmet Altay wrote:
> >          > Could you make that change and see if it would have addressed
> >         the issue
> >          > here?
> >          >
> >          > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
> >         <kcwea...@google.com <mailto:kcwea...@google.com>
> >          > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>>
> wrote:
> >          >
> >          >     The flag is automatically set, but not in a smart way.
> Taking
> >          >     another look at the code, a more resilient fix would be
> >         to just
> >          >     check if the runner isinstance of PortableRunner.
> >          >
> >          >     Kyle Weaver | Software Engineer | github.com/ibzib
> >         <http://github.com/ibzib>
> >          >     <http://github.com/ibzib> | kcwea...@google.com
> >         <mailto:kcwea...@google.com>
> >          >     <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> >          >
> >          >
> >          >     On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
> >         <al...@google.com <mailto:al...@google.com>
> >          >     <mailto:al...@google.com <mailto:al...@google.com>>>
> wrote:
> >          >
> >          >         Is not this flag set automatically for the portable
> >         runner here
> >          >         [1] ?
> >          >
> >          >         [1]
> >          >
> >
> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
> >          >
> >          >         On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
> >          >         <rober...@google.com <mailto:rober...@google.com>
> >         <mailto:rober...@google.com <mailto:rober...@google.com>>>
> wrote:
> >          >
> >          >             On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
> >         <t...@apache.org <mailto:t...@apache.org>
> >          >             <mailto:t...@apache.org <mailto:t...@apache.org>>>
> >         wrote:
> >          >              >
> >          >              > +1 for making --experiments=beam_fn_api
> default.
> >          >              >
> >          >              > Can the Dataflow runner driver just remove the
> >         setting if
> >          >             it is not compatible?
> >          >
> >          >             The tricky bit would be undoing the differences
> >         in graph
> >          >             construction
> >          >             due to this flag flip. But I would be in favor of
> >         changing
> >          >             the default
> >          >             (probably just removing the flag) and moving the
> >          >             non-portability parts
> >          >             into the dataflow runner itself. (It looks like
> >         the key
> >          >             differences
> >          >             here are for the Create and Read transforms.)
> >          >
> >          >              > On Tue, Sep 17, 2019 at 11:33 AM Maximilian
> >         Michels
> >          >             <m...@apache.org <mailto:m...@apache.org>
> >         <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >          >              >>
> >          >              >> +dev
> >          >              >>
> >          >              >> The beam_fn_api flag and the way it is
> >         automatically set
> >          >             is error-prone.
> >          >              >> Is there anything that prevents us from
> >         removing it? I
> >          >             understand that
> >          >              >> some Runners, e.g. Dataflow Runner have two
> >         modes of
> >          >             executing Python
> >          >              >> pipelines (legacy and portable), but at this
> >         point it
> >          >             seems clear that
> >          >              >> the portability mode should be the default.
> >          >              >>
> >          >              >> Cheers,
> >          >              >> Max
> >          >              >>
> >          >              >> On September 14, 2019 7:50:52 PM PDT, Yu
> Watanabe
> >          >              >> <yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>>>
> >          >             wrote:
> >          >              >>
> >          >              >>     Kyle
> >          >              >>
> >          >              >>     Thank you for the assistance.
> >          >              >>
> >          >              >>     By specifying "experiments" in
> >         PipelineOptions ,
> >          >              >>     ==========================================
> >          >              >>              options = PipelineOptions([
> >          >              >>
> >         "--runner=FlinkRunner",
> >          >              >>
> "--flink_version=1.8",
> >          >              >>
> >          >             "--flink_master_url=localhost:8081",
> >          >              >>
> >         "--experiments=beam_fn_api"
> >          >              >>                        ])
> >          >              >>     ==========================================
> >          >              >>
> >          >              >>     I was able to submit the job successfully.
> >          >              >>
> >          >              >>     [grpc-default-executor-0] INFO
> >          >              >>
> >           org.apache.beam.runners.flink.FlinkJobInvoker -
> >          >             Invoking job
> >          >              >>
> >          >
> >
>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
> >          >              >>     [grpc-default-executor-0] INFO
> >          >              >>
> >          >
> >
>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
> >          >              >>     Starting job invocation
> >          >              >>
> >          >
> >
>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>
> >           org.apache.beam.runners.flink.FlinkPipelineRunner -
> >          >             Translating
> >          >              >>     pipeline to Flink program.
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>
> >          >
> >           org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> >          >             Creating
> >          >              >>     a Batch Execution Environment.
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>
> >          >
> >           org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> >          >             Using
> >          >              >>     Flink Master URL localhost:8081.
> >          >              >>     [flink-runner-job-invoker] WARN
> >          >              >>
> >          >
> >           org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
> >          >              >>     default parallelism could be found.
> >         Defaulting to
> >          >             parallelism 1.
> >          >              >>     Please set an explicit parallelism with
> >         --parallelism
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>
> >           org.apache.flink.api.java.ExecutionEnvironment - The
> >          >             job has 0
> >          >              >>     registered types and 0 default Kryo
> >         serializers
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>
> >           org.apache.flink.configuration.Configuration -
> >          >             Config uses fallback
> >          >              >>     configuration key
> >         'jobmanager.rpc.address' instead
> >          >             of key 'rest.address'
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>     org.apache.flink.runtime.rest.RestClient
> >         - Rest
> >          >             client endpoint started.
> >          >              >>     [flink-runner-job-invoker] INFO
> >          >              >>
> >          >
> >           org.apache.flink.client.program.rest.RestClusterClient -
> >          >             Submitting
> >          >              >>     job 4e055a8878dda3f564a7b7c84d48510d
> >         (detached: false).
> >          >              >>
> >          >              >>     Thanks,
> >          >              >>     Yu Watanabe
> >          >              >>
> >          >              >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle
> Weaver
> >          >             <kcwea...@google.com <mailto:kcwea...@google.com>
> >         <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> >          >              >>     <mailto:kcwea...@google.com
> >         <mailto:kcwea...@google.com>
> >          >             <mailto:kcwea...@google.com
> >         <mailto:kcwea...@google.com>>>> wrote:
> >          >              >>
> >          >              >>         Try adding
> >         "--experiments=beam_fn_api" to your
> >          >             pipeline options.
> >          >              >>         (This is a known issue with Beam 2.15
> >         that will
> >          >             be fixed in 2.16.)
> >          >              >>
> >          >              >>         Kyle Weaver | Software Engineer |
> >          > github.com/ibzib <http://github.com/ibzib>
> >         <http://github.com/ibzib>
> >          >              >>         <http://github.com/ibzib> |
> >         kcwea...@google.com <mailto:kcwea...@google.com>
> >          >             <mailto:kcwea...@google.com
> >         <mailto:kcwea...@google.com>>
> >          >              >>         <mailto:kcwea...@google.com
> >         <mailto:kcwea...@google.com>
> >          >             <mailto:kcwea...@google.com
> >         <mailto:kcwea...@google.com>>>
> >          >              >>
> >          >              >>
> >          >              >>         On Sat, Sep 14, 2019 at 12:52 AM Yu
> >         Watanabe
> >          >              >>         <yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>
> >          >             <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>
> >          >             <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>>>> wrote:
> >          >              >>
> >          >              >>             Hello.
> >          >              >>
> >          >              >>             I am trying to spin up the flink
> >         runner but
> >          >             looks like data
> >          >              >>             serialization is failing.
> >          >              >>             I would like to ask for help to
> >         get over
> >          >             with this error.
> >          >              >>
> >          >              >>
> >          >
> >
>  ========================================================================
> >          >              >>             [flink-runner-job-invoker] ERROR
> >          >              >>
> >          >
> >           org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
> >          >              >>             - Error during job invocation
> >          >              >>
> >          >
> >
>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
> >          >              >>
> >           java.lang.IllegalArgumentException: unable
> >          >             to deserialize
> >          >              >>             BoundedSource
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
> >          >              >>             ywatanabe@debian-09-00:~$
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >          >              >>                      at
> >          >             java.lang.Thread.run(Thread.java:748)
> >          >              >>             Caused by: java.io.IOException:
> >          >             FAILED_TO_UNCOMPRESS(5)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
> >          >              >>                      at
> >          >              >>
> >          >
> >           org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> >          >              >>                      at
> >          >              >>
> >          >
> >           org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
> >          >              >>                      at
> >          >
>  org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
> >          >              >>                      at
> >          >              >>
> >          >
> >
>  
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> >          >              >>                      ... 13 more
> >          >              >>
> >          >
> >
>  ========================================================================
> >          >              >>
> >          >              >>             My beam version is below.
> >          >              >>
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>             (python)
> >         ywatanabe@debian-09-00:~$ pip3
> >          >             freeze | grep
> >          >              >>             apache-beam
> >          >              >>             apache-beam==2.15.0
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>             I have my harness container ready
> >         on  the
> >          >             registry.
> >          >              >>
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>             ywatanabe@debian-09-00:~$ docker
> >         search
> >          >              >> ywatanabe-docker-apache.bintray.io/python3
> >         <http://ywatanabe-docker-apache.bintray.io/python3>
> >          >             <
> http://ywatanabe-docker-apache.bintray.io/python3>
> >          >              >>
> >          >               <
> http://ywatanabe-docker-apache.bintray.io/python3>
> >          >              >>             NAME                DESCRIPTION
> >                 STARS
> >          >              >>             OFFICIAL            AUTOMATED
> >          >              >>             beam/python3
> >                0
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>             Flink is ready on separate
> cluster.
> >          >              >>
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>             (python)
> >         ywatanabe@debian-09-00:~$ ss -atunp
> >          >             | grep 8081
> >          >              >>             tcp    LISTEN     0      128
> >         :::8081
> >          >                   :::*
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>
> >          >              >>             My debian version.
> >          >              >>
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>             (python)
> >         ywatanabe@debian-09-00:~$ cat
> >          >             /etc/debian_version
> >          >              >>             9.11
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>
> >          >              >>             My code snippet is below.
> >          >              >>
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>                  options = PipelineOptions([
> >          >              >>
> >         "--runner=FlinkRunner",
> >          >              >>
> >         "--flink_version=1.8",
> >          >              >>
>  "--flink_master_url=localhost:8081"
> >          >              >>                            ])
> >          >              >>
> >          >              >>                  with
> >         beam.Pipeline(options=options) as p:
> >          >              >>
> >          >              >>                      (p | beam.Create(["Hello
> >         World"]))
> >          >              >>
> >          >
> >
>  =======================================================================
> >          >              >>
> >          >              >>
> >          >              >>             Would there be any other settings
> >         should I
> >          >             look for ?
> >          >              >>
> >          >              >>             Thanks,
> >          >              >>             Yu Watanabe
> >          >              >>
> >          >              >>             --
> >          >              >>             Yu Watanabe
> >          >              >>             Weekend Freelancer who loves to
> >         challenge
> >          >             building data
> >          >              >>             platform
> >          >              >> yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>>
> >          >             <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>>>
> >          >              >>             LinkedIn icon
> >          >             <https://www.linkedin.com/in/yuwatanabe1>
> >          >              >>             Twitter icon
> >         <https://twitter.com/yuwtennis>
> >          >              >>
> >          >              >>
> >          >              >>
> >          >              >>     --
> >          >              >>     Yu Watanabe
> >          >              >>     Weekend Freelancer who loves to challenge
> >         building
> >          >             data platform
> >          >              >> yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>>
> >          >             <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
> >         <mailto:yu.w.ten...@gmail.com>>>
> >          >              >>     LinkedIn icon
> >          >             <https://www.linkedin.com/in/yuwatanabe1> Twitter
> >         icon
> >          >              >>     <https://twitter.com/yuwtennis>
> >          >              >>
> >          >
> >
>

Reply via email to