Could you make that change and see if it would have addressed the issue
here?

On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com> wrote:

> The flag is automatically set, but not in a smart way. Taking another look
> at the code, a more resilient fix would be to just check if the runner
> isinstance of PortableRunner.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com> wrote:
>
>> Is not this flag set automatically for the portable runner here [1] ?
>>
>> [1]
>> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
>>
>> On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <t...@apache.org> wrote:
>>> >
>>> > +1 for making --experiments=beam_fn_api default.
>>> >
>>> > Can the Dataflow runner driver just remove the setting if it is not
>>> compatible?
>>>
>>> The tricky bit would be undoing the differences in graph construction
>>> due to this flag flip. But I would be in favor of changing the default
>>> (probably just removing the flag) and moving the non-portability parts
>>> into the dataflow runner itself. (It looks like the key differences
>>> here are for the Create and Read transforms.)
>>>
>>> > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>> >>
>>> >> +dev
>>> >>
>>> >> The beam_fn_api flag and the way it is automatically set is
>>> error-prone.
>>> >> Is there anything that prevents us from removing it? I understand that
>>> >> some Runners, e.g. Dataflow Runner have two modes of executing Python
>>> >> pipelines (legacy and portable), but at this point it seems clear that
>>> >> the portability mode should be the default.
>>> >>
>>> >> Cheers,
>>> >> Max
>>> >>
>>> >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
>>> >> <yu.w.ten...@gmail.com> wrote:
>>> >>
>>> >>     Kyle
>>> >>
>>> >>     Thank you for the assistance.
>>> >>
>>> >>     By specifying "experiments" in PipelineOptions ,
>>> >>     ==========================================
>>> >>              options = PipelineOptions([
>>> >>                            "--runner=FlinkRunner",
>>> >>                            "--flink_version=1.8",
>>> >>                            "--flink_master_url=localhost:8081",
>>> >>                            "--experiments=beam_fn_api"
>>> >>                        ])
>>> >>     ==========================================
>>> >>
>>> >>     I was able to submit the job successfully.
>>> >>
>>> >>     [grpc-default-executor-0] INFO
>>> >>     org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
>>> >>
>>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>>> >>     [grpc-default-executor-0] INFO
>>> >>     org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
>>> >>     Starting job invocation
>>> >>
>>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
>>> >>     pipeline to Flink program.
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.beam.runners.flink.FlinkExecutionEnvironments -
>>> Creating
>>> >>     a Batch Execution Environment.
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
>>> >>     Flink Master URL localhost:8081.
>>> >>     [flink-runner-job-invoker] WARN
>>> >>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
>>> >>     default parallelism could be found. Defaulting to parallelism 1.
>>> >>     Please set an explicit parallelism with --parallelism
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.flink.api.java.ExecutionEnvironment - The job has 0
>>> >>     registered types and 0 default Kryo serializers
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.flink.configuration.Configuration - Config uses
>>> fallback
>>> >>     configuration key 'jobmanager.rpc.address' instead of key
>>> 'rest.address'
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.flink.runtime.rest.RestClient - Rest client endpoint
>>> started.
>>> >>     [flink-runner-job-invoker] INFO
>>> >>     org.apache.flink.client.program.rest.RestClusterClient -
>>> Submitting
>>> >>     job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
>>> >>
>>> >>     Thanks,
>>> >>     Yu Watanabe
>>> >>
>>> >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com
>>> >>     <mailto:kcwea...@google.com>> wrote:
>>> >>
>>> >>         Try adding "--experiments=beam_fn_api" to your pipeline
>>> options.
>>> >>         (This is a known issue with Beam 2.15 that will be fixed in
>>> 2.16.)
>>> >>
>>> >>         Kyle Weaver | Software Engineer | github.com/ibzib
>>> >>         <http://github.com/ibzib> | kcwea...@google.com
>>> >>         <mailto:kcwea...@google.com>
>>> >>
>>> >>
>>> >>         On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
>>> >>         <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> wrote:
>>> >>
>>> >>             Hello.
>>> >>
>>> >>             I am trying to spin up the flink runner but looks like
>>> data
>>> >>             serialization is failing.
>>> >>             I would like to ask for help to get over with this error.
>>> >>
>>> >>
>>>  ========================================================================
>>> >>             [flink-runner-job-invoker] ERROR
>>> >>
>>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
>>> >>             - Error during job invocation
>>> >>
>>>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>>> >>             java.lang.IllegalArgumentException: unable to deserialize
>>> >>             BoundedSource
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>>> >>                      at
>>> >>
>>>  
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
>>> >>             ywatanabe@debian-09-00:~$
>>> >>                      at
>>> >>
>>>  
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> >>                      at java.lang.Thread.run(Thread.java:748)
>>> >>             Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>>> >>                      at
>>> >>
>>>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>>> >>                      at
>>> >>             org.xerial.snappy.SnappyNative.rawUncompress(Native
>>> Method)
>>> >>                      at
>>> >>             org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>>> >>                      at
>>> org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>>> >>                      at
>>> >>
>>>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>>> >>                      at
>>> >>
>>>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>>> >>                      at
>>> >>
>>>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>>> >>                      at
>>> >>
>>>  
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>>> >>                      ... 13 more
>>> >>
>>>  ========================================================================
>>> >>
>>> >>             My beam version is below.
>>> >>
>>> >>
>>>  =======================================================================
>>> >>             (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep
>>> >>             apache-beam
>>> >>             apache-beam==2.15.0
>>> >>
>>>  =======================================================================
>>> >>
>>> >>             I have my harness container ready on  the registry.
>>> >>
>>> >>
>>>  =======================================================================
>>> >>             ywatanabe@debian-09-00:~$ docker search
>>> >>             ywatanabe-docker-apache.bintray.io/python3
>>> >>             <http://ywatanabe-docker-apache.bintray.io/python3>
>>> >>             NAME                DESCRIPTION         STARS
>>> >>             OFFICIAL            AUTOMATED
>>> >>             beam/python3                            0
>>> >>
>>>  =======================================================================
>>> >>
>>> >>             Flink is ready on separate cluster.
>>> >>
>>> >>
>>>  =======================================================================
>>> >>             (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
>>> >>             tcp    LISTEN     0      128      :::8081         :::*
>>> >>
>>>  =======================================================================
>>> >>
>>> >>
>>> >>             My debian version.
>>> >>
>>> >>
>>>  =======================================================================
>>> >>
>>> >>             (python) ywatanabe@debian-09-00:~$ cat
>>> /etc/debian_version
>>> >>             9.11
>>> >>
>>>  =======================================================================
>>> >>
>>> >>
>>> >>             My code snippet is below.
>>> >>
>>> >>
>>>  =======================================================================
>>> >>
>>> >>                  options = PipelineOptions([
>>> >>                                "--runner=FlinkRunner",
>>> >>                                "--flink_version=1.8",
>>> >>             "--flink_master_url=localhost:8081"
>>> >>                            ])
>>> >>
>>> >>                  with beam.Pipeline(options=options) as p:
>>> >>
>>> >>                      (p | beam.Create(["Hello World"]))
>>> >>
>>>  =======================================================================
>>> >>
>>> >>
>>> >>             Would there be any other settings should I look for ?
>>> >>
>>> >>             Thanks,
>>> >>             Yu Watanabe
>>> >>
>>> >>             --
>>> >>             Yu Watanabe
>>> >>             Weekend Freelancer who loves to challenge building data
>>> >>             platform
>>> >>             yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>>> >>             LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1>
>>> >>             Twitter icon <https://twitter.com/yuwtennis>
>>> >>
>>> >>
>>> >>
>>> >>     --
>>> >>     Yu Watanabe
>>> >>     Weekend Freelancer who loves to challenge building data platform
>>> >>     yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>>> >>     LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter
>>> icon
>>> >>     <https://twitter.com/yuwtennis>
>>> >>
>>>
>>

Reply via email to