Actually, the reported issues are already fixed on head. We're just trying
to prevent similar issues in the future.

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com> wrote:

>
>
> On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org> wrote:
>
>> > Is not this flag set automatically for the portable runner
>>
>> Yes, the flag is set automatically, but it has been broken before and
>> likely will be again. It just adds additional complexity to portable
>> Runners. There is no other portability API then the Fn API. This flag
>> historically had its justification, but seems obsolete now.
>>
>
> I disagree that this flag is obsolete. It is still serving a purpose for
> batch users using dataflow runner and that is decent chunk of beam python
> users.
>
> I agree with switching the default. I would like to give enough time to
> decouple the flag from the core code. (With a quick search I saw two
> instances related to Read and Create.) Have time to test changes and then
> switch the default.
>
>
>>
>> An isinstance check might be smarter, but does not get rid of the root
>> of the problem.
>>
>
> I might be wrong, IIUC, it will temporarily resolve the reported issues.
> Is this not accurate?
>
>
>>
>> -Max
>>
>> On 17.09.19 14:20, Ahmet Altay wrote:
>> > Could you make that change and see if it would have addressed the issue
>> > here?
>> >
>> > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com
>> > <mailto:kcwea...@google.com>> wrote:
>> >
>> >     The flag is automatically set, but not in a smart way. Taking
>> >     another look at the code, a more resilient fix would be to just
>> >     check if the runner isinstance of PortableRunner.
>> >
>> >     Kyle Weaver | Software Engineer | github.com/ibzib
>> >     <http://github.com/ibzib> | kcwea...@google.com
>> >     <mailto:kcwea...@google.com>
>> >
>> >
>> >     On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com
>> >     <mailto:al...@google.com>> wrote:
>> >
>> >         Is not this flag set automatically for the portable runner here
>> >         [1] ?
>> >
>> >         [1]
>> >
>> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
>> >
>> >         On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
>> >         <rober...@google.com <mailto:rober...@google.com>> wrote:
>> >
>> >             On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <
>> t...@apache.org
>> >             <mailto:t...@apache.org>> wrote:
>> >              >
>> >              > +1 for making --experiments=beam_fn_api default.
>> >              >
>> >              > Can the Dataflow runner driver just remove the setting if
>> >             it is not compatible?
>> >
>> >             The tricky bit would be undoing the differences in graph
>> >             construction
>> >             due to this flag flip. But I would be in favor of changing
>> >             the default
>> >             (probably just removing the flag) and moving the
>> >             non-portability parts
>> >             into the dataflow runner itself. (It looks like the key
>> >             differences
>> >             here are for the Create and Read transforms.)
>> >
>> >              > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels
>> >             <m...@apache.org <mailto:m...@apache.org>> wrote:
>> >              >>
>> >              >> +dev
>> >              >>
>> >              >> The beam_fn_api flag and the way it is automatically set
>> >             is error-prone.
>> >              >> Is there anything that prevents us from removing it? I
>> >             understand that
>> >              >> some Runners, e.g. Dataflow Runner have two modes of
>> >             executing Python
>> >              >> pipelines (legacy and portable), but at this point it
>> >             seems clear that
>> >              >> the portability mode should be the default.
>> >              >>
>> >              >> Cheers,
>> >              >> Max
>> >              >>
>> >              >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
>> >              >> <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
>> >             wrote:
>> >              >>
>> >              >>     Kyle
>> >              >>
>> >              >>     Thank you for the assistance.
>> >              >>
>> >              >>     By specifying "experiments" in PipelineOptions ,
>> >              >>     ==========================================
>> >              >>              options = PipelineOptions([
>> >              >>                            "--runner=FlinkRunner",
>> >              >>                            "--flink_version=1.8",
>> >              >>
>> >             "--flink_master_url=localhost:8081",
>> >              >>                            "--experiments=beam_fn_api"
>> >              >>                        ])
>> >              >>     ==========================================
>> >              >>
>> >              >>     I was able to submit the job successfully.
>> >              >>
>> >              >>     [grpc-default-executor-0] INFO
>> >              >>     org.apache.beam.runners.flink.FlinkJobInvoker -
>> >             Invoking job
>> >              >>
>> >
>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> >              >>     [grpc-default-executor-0] INFO
>> >              >>
>> >
>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
>> >              >>     Starting job invocation
>> >              >>
>> >
>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>     org.apache.beam.runners.flink.FlinkPipelineRunner -
>> >             Translating
>> >              >>     pipeline to Flink program.
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>
>> >               org.apache.beam.runners.flink.FlinkExecutionEnvironments -
>> >             Creating
>> >              >>     a Batch Execution Environment.
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>
>> >               org.apache.beam.runners.flink.FlinkExecutionEnvironments -
>> >             Using
>> >              >>     Flink Master URL localhost:8081.
>> >              >>     [flink-runner-job-invoker] WARN
>> >              >>
>> >               org.apache.beam.runners.flink.FlinkExecutionEnvironments
>> - No
>> >              >>     default parallelism could be found. Defaulting to
>> >             parallelism 1.
>> >              >>     Please set an explicit parallelism with
>> --parallelism
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>     org.apache.flink.api.java.ExecutionEnvironment - The
>> >             job has 0
>> >              >>     registered types and 0 default Kryo serializers
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>     org.apache.flink.configuration.Configuration -
>> >             Config uses fallback
>> >              >>     configuration key 'jobmanager.rpc.address' instead
>> >             of key 'rest.address'
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>     org.apache.flink.runtime.rest.RestClient - Rest
>> >             client endpoint started.
>> >              >>     [flink-runner-job-invoker] INFO
>> >              >>
>> >               org.apache.flink.client.program.rest.RestClusterClient -
>> >             Submitting
>> >              >>     job 4e055a8878dda3f564a7b7c84d48510d (detached:
>> false).
>> >              >>
>> >              >>     Thanks,
>> >              >>     Yu Watanabe
>> >              >>
>> >              >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver
>> >             <kcwea...@google.com <mailto:kcwea...@google.com>
>> >              >>     <mailto:kcwea...@google.com
>> >             <mailto:kcwea...@google.com>>> wrote:
>> >              >>
>> >              >>         Try adding "--experiments=beam_fn_api" to your
>> >             pipeline options.
>> >              >>         (This is a known issue with Beam 2.15 that will
>> >             be fixed in 2.16.)
>> >              >>
>> >              >>         Kyle Weaver | Software Engineer |
>> >             github.com/ibzib <http://github.com/ibzib>
>> >              >>         <http://github.com/ibzib> | kcwea...@google.com
>> >             <mailto:kcwea...@google.com>
>> >              >>         <mailto:kcwea...@google.com
>> >             <mailto:kcwea...@google.com>>
>> >              >>
>> >              >>
>> >              >>         On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
>> >              >>         <yu.w.ten...@gmail.com
>> >             <mailto:yu.w.ten...@gmail.com> <mailto:
>> yu.w.ten...@gmail.com
>> >             <mailto:yu.w.ten...@gmail.com>>> wrote:
>> >              >>
>> >              >>             Hello.
>> >              >>
>> >              >>             I am trying to spin up the flink runner but
>> >             looks like data
>> >              >>             serialization is failing.
>> >              >>             I would like to ask for help to get over
>> >             with this error.
>> >              >>
>> >              >>
>> >
>>  ========================================================================
>> >              >>             [flink-runner-job-invoker] ERROR
>> >              >>
>> >
>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
>> >              >>             - Error during job invocation
>> >              >>
>> >
>>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>> >              >>             java.lang.IllegalArgumentException: unable
>> >             to deserialize
>> >              >>             BoundedSource
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>> >              >>                      at
>> >              >>
>> >
>>  
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
>> >              >>             ywatanabe@debian-09-00:~$
>> >              >>                      at
>> >              >>
>> >
>>  
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >              >>                      at
>> >             java.lang.Thread.run(Thread.java:748)
>> >              >>             Caused by: java.io.IOException:
>> >             FAILED_TO_UNCOMPRESS(5)
>> >              >>                      at
>> >              >>
>> >
>>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>> >              >>                      at
>> >              >>
>> >               org.xerial.snappy.SnappyNative.rawUncompress(Native
>> Method)
>> >              >>                      at
>> >              >>
>> >               org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>> >              >>                      at
>> >             org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>> >              >>                      at
>> >              >>
>> >
>>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>> >              >>                      at
>> >              >>
>> >
>>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>> >              >>                      at
>> >              >>
>> >
>>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>> >              >>                      at
>> >              >>
>> >
>>  
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>> >              >>                      ... 13 more
>> >              >>
>> >
>>  ========================================================================
>> >              >>
>> >              >>             My beam version is below.
>> >              >>
>> >              >>
>> >
>>  =======================================================================
>> >              >>             (python) ywatanabe@debian-09-00:~$ pip3
>> >             freeze | grep
>> >              >>             apache-beam
>> >              >>             apache-beam==2.15.0
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>             I have my harness container ready on  the
>> >             registry.
>> >              >>
>> >              >>
>> >
>>  =======================================================================
>> >              >>             ywatanabe@debian-09-00:~$ docker search
>> >              >> ywatanabe-docker-apache.bintray.io/python3
>> >             <http://ywatanabe-docker-apache.bintray.io/python3>
>> >              >>
>> >               <http://ywatanabe-docker-apache.bintray.io/python3>
>> >              >>             NAME                DESCRIPTION
>>  STARS
>> >              >>             OFFICIAL            AUTOMATED
>> >              >>             beam/python3                            0
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>             Flink is ready on separate cluster.
>> >              >>
>> >              >>
>> >
>>  =======================================================================
>> >              >>             (python) ywatanabe@debian-09-00:~$ ss
>> -atunp
>> >             | grep 8081
>> >              >>             tcp    LISTEN     0      128      :::8081
>> >                   :::*
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>
>> >              >>             My debian version.
>> >              >>
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>             (python) ywatanabe@debian-09-00:~$ cat
>> >             /etc/debian_version
>> >              >>             9.11
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>
>> >              >>             My code snippet is below.
>> >              >>
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>                  options = PipelineOptions([
>> >              >>                                "--runner=FlinkRunner",
>> >              >>                                "--flink_version=1.8",
>> >              >>             "--flink_master_url=localhost:8081"
>> >              >>                            ])
>> >              >>
>> >              >>                  with beam.Pipeline(options=options) as
>> p:
>> >              >>
>> >              >>                      (p | beam.Create(["Hello World"]))
>> >              >>
>> >
>>  =======================================================================
>> >              >>
>> >              >>
>> >              >>             Would there be any other settings should I
>> >             look for ?
>> >              >>
>> >              >>             Thanks,
>> >              >>             Yu Watanabe
>> >              >>
>> >              >>             --
>> >              >>             Yu Watanabe
>> >              >>             Weekend Freelancer who loves to challenge
>> >             building data
>> >              >>             platform
>> >              >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>> >             <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com
>> >>
>> >              >>             LinkedIn icon
>> >             <https://www.linkedin.com/in/yuwatanabe1>
>> >              >>             Twitter icon <https://twitter.com/yuwtennis
>> >
>> >              >>
>> >              >>
>> >              >>
>> >              >>     --
>> >              >>     Yu Watanabe
>> >              >>     Weekend Freelancer who loves to challenge building
>> >             data platform
>> >              >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>> >             <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com
>> >>
>> >              >>     LinkedIn icon
>> >             <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
>> >              >>     <https://twitter.com/yuwtennis>
>> >              >>
>> >
>>
>

Reply via email to