+dev

The beam_fn_api flag and the way it is automatically set is error-prone. Is there anything that prevents us from removing it? I understand that some Runners, e.g. Dataflow Runner have two modes of executing Python pipelines (legacy and portable), but at this point it seems clear that the portability mode should be the default.

Cheers,
Max

On September 14, 2019 7:50:52 PM PDT, Yu Watanabe <yu.w.ten...@gmail.com> wrote:

   Kyle

   Thank you for the assistance.

   By specifying "experiments" in PipelineOptions ,
   ==========================================
            options = PipelineOptions([
                          "--runner=FlinkRunner",
                          "--flink_version=1.8",
                          "--flink_master_url=localhost:8081",
                          "--experiments=beam_fn_api"
                      ])
   ==========================================

   I was able to submit the job successfully.

   [grpc-default-executor-0] INFO
   org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
   BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
   [grpc-default-executor-0] INFO
   org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
   Starting job invocation
   BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
   [flink-runner-job-invoker] INFO
   org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
   pipeline to Flink program.
   [flink-runner-job-invoker] INFO
   org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
   a Batch Execution Environment.
   [flink-runner-job-invoker] INFO
   org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
   Flink Master URL localhost:8081.
   [flink-runner-job-invoker] WARN
   org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
   default parallelism could be found. Defaulting to parallelism 1.
   Please set an explicit parallelism with --parallelism
   [flink-runner-job-invoker] INFO
   org.apache.flink.api.java.ExecutionEnvironment - The job has 0
   registered types and 0 default Kryo serializers
   [flink-runner-job-invoker] INFO
   org.apache.flink.configuration.Configuration - Config uses fallback
   configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
   [flink-runner-job-invoker] INFO
   org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
   [flink-runner-job-invoker] INFO
   org.apache.flink.client.program.rest.RestClusterClient - Submitting
   job 4e055a8878dda3f564a7b7c84d48510d (detached: false).

   Thanks,
   Yu Watanabe

   On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com
   <mailto:kcwea...@google.com>> wrote:

       Try adding "--experiments=beam_fn_api" to your pipeline options.
       (This is a known issue with Beam 2.15 that will be fixed in 2.16.)

       Kyle Weaver | Software Engineer | github.com/ibzib
       <http://github.com/ibzib> | kcwea...@google.com
       <mailto:kcwea...@google.com>


       On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
       <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> wrote:

           Hello.

           I am trying to spin up the flink runner but looks like data
           serialization is failing.
           I would like to ask for help to get over with this error.

           
========================================================================
           [flink-runner-job-invoker] ERROR
           org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
           - Error during job invocation
           
BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
           java.lang.IllegalArgumentException: unable to deserialize
           BoundedSource
                    at
           
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
                    at
           
org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
                    at
           
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
                    at
           
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
                    at
           
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
                    at
           
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
                    at
           
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
                    at
           
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
                    at
           
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
                    at
           
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
                    at
           
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
                    at
           
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
           ywatanabe@debian-09-00:~$
                    at
           
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                    at java.lang.Thread.run(Thread.java:748)
           Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
                    at
           org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
                    at
           org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
                    at
           org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
                    at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
                    at
           
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
                    at
           
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
                    at
           org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
                    at
           
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
                    ... 13 more
           
========================================================================

           My beam version is below.

           
=======================================================================
           (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep
           apache-beam
           apache-beam==2.15.0
           
=======================================================================

           I have my harness container ready on  the registry.

           
=======================================================================
           ywatanabe@debian-09-00:~$ docker search
           ywatanabe-docker-apache.bintray.io/python3
           <http://ywatanabe-docker-apache.bintray.io/python3>
           NAME                DESCRIPTION         STARS          
           OFFICIAL            AUTOMATED
           beam/python3                            0
           
=======================================================================

           Flink is ready on separate cluster.

           
=======================================================================
           (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
           tcp    LISTEN     0      128      :::8081         :::*
           
=======================================================================


           My debian version.

           
=======================================================================

           (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
           9.11
           
=======================================================================


           My code snippet is below.

           
=======================================================================

                options = PipelineOptions([
                              "--runner=FlinkRunner",
                              "--flink_version=1.8",
           "--flink_master_url=localhost:8081"
                          ])

                with beam.Pipeline(options=options) as p:

                    (p | beam.Create(["Hello World"]))
           
=======================================================================


           Would there be any other settings should I look for ?

           Thanks,
           Yu Watanabe

-- Yu Watanabe
           Weekend Freelancer who loves to challenge building data
           platform
           yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
           LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1>
           Twitter icon <https://twitter.com/yuwtennis>



-- Yu Watanabe
   Weekend Freelancer who loves to challenge building data platform
   yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
   LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
   <https://twitter.com/yuwtennis>

Reply via email to