Actually, the reported issues are already fixed on head. We're just trying to prevent similar issues in the future.
Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com> wrote: > > > On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org> wrote: > >> > Is not this flag set automatically for the portable runner >> >> Yes, the flag is set automatically, but it has been broken before and >> likely will be again. It just adds additional complexity to portable >> Runners. There is no other portability API then the Fn API. This flag >> historically had its justification, but seems obsolete now. >> > > I disagree that this flag is obsolete. It is still serving a purpose for > batch users using dataflow runner and that is decent chunk of beam python > users. > > I agree with switching the default. I would like to give enough time to > decouple the flag from the core code. (With a quick search I saw two > instances related to Read and Create.) Have time to test changes and then > switch the default. > > >> >> An isinstance check might be smarter, but does not get rid of the root >> of the problem. >> > > I might be wrong, IIUC, it will temporarily resolve the reported issues. > Is this not accurate? > > >> >> -Max >> >> On 17.09.19 14:20, Ahmet Altay wrote: >> > Could you make that change and see if it would have addressed the issue >> > here? >> > >> > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com >> > <mailto:kcwea...@google.com>> wrote: >> > >> > The flag is automatically set, but not in a smart way. Taking >> > another look at the code, a more resilient fix would be to just >> > check if the runner isinstance of PortableRunner. >> > >> > Kyle Weaver | Software Engineer | github.com/ibzib >> > <http://github.com/ibzib> | kcwea...@google.com >> > <mailto:kcwea...@google.com> >> > >> > >> > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com >> > <mailto:al...@google.com>> wrote: >> > >> > Is not this flag set automatically for the portable runner here >> > [1] ? >> > >> > [1] >> > >> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160 >> > >> > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw >> > <rober...@google.com <mailto:rober...@google.com>> wrote: >> > >> > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise < >> t...@apache.org >> > <mailto:t...@apache.org>> wrote: >> > > >> > > +1 for making --experiments=beam_fn_api default. >> > > >> > > Can the Dataflow runner driver just remove the setting if >> > it is not compatible? >> > >> > The tricky bit would be undoing the differences in graph >> > construction >> > due to this flag flip. But I would be in favor of changing >> > the default >> > (probably just removing the flag) and moving the >> > non-portability parts >> > into the dataflow runner itself. (It looks like the key >> > differences >> > here are for the Create and Read transforms.) >> > >> > > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels >> > <m...@apache.org <mailto:m...@apache.org>> wrote: >> > >> >> > >> +dev >> > >> >> > >> The beam_fn_api flag and the way it is automatically set >> > is error-prone. >> > >> Is there anything that prevents us from removing it? I >> > understand that >> > >> some Runners, e.g. Dataflow Runner have two modes of >> > executing Python >> > >> pipelines (legacy and portable), but at this point it >> > seems clear that >> > >> the portability mode should be the default. >> > >> >> > >> Cheers, >> > >> Max >> > >> >> > >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe >> > >> <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> >> > wrote: >> > >> >> > >> Kyle >> > >> >> > >> Thank you for the assistance. >> > >> >> > >> By specifying "experiments" in PipelineOptions , >> > >> ========================================== >> > >> options = PipelineOptions([ >> > >> "--runner=FlinkRunner", >> > >> "--flink_version=1.8", >> > >> >> > "--flink_master_url=localhost:8081", >> > >> "--experiments=beam_fn_api" >> > >> ]) >> > >> ========================================== >> > >> >> > >> I was able to submit the job successfully. >> > >> >> > >> [grpc-default-executor-0] INFO >> > >> org.apache.beam.runners.flink.FlinkJobInvoker - >> > Invoking job >> > >> >> > >> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 >> > >> [grpc-default-executor-0] INFO >> > >> >> > >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - >> > >> Starting job invocation >> > >> >> > >> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 >> > >> [flink-runner-job-invoker] INFO >> > >> org.apache.beam.runners.flink.FlinkPipelineRunner - >> > Translating >> > >> pipeline to Flink program. >> > >> [flink-runner-job-invoker] INFO >> > >> >> > org.apache.beam.runners.flink.FlinkExecutionEnvironments - >> > Creating >> > >> a Batch Execution Environment. >> > >> [flink-runner-job-invoker] INFO >> > >> >> > org.apache.beam.runners.flink.FlinkExecutionEnvironments - >> > Using >> > >> Flink Master URL localhost:8081. >> > >> [flink-runner-job-invoker] WARN >> > >> >> > org.apache.beam.runners.flink.FlinkExecutionEnvironments >> - No >> > >> default parallelism could be found. Defaulting to >> > parallelism 1. >> > >> Please set an explicit parallelism with >> --parallelism >> > >> [flink-runner-job-invoker] INFO >> > >> org.apache.flink.api.java.ExecutionEnvironment - The >> > job has 0 >> > >> registered types and 0 default Kryo serializers >> > >> [flink-runner-job-invoker] INFO >> > >> org.apache.flink.configuration.Configuration - >> > Config uses fallback >> > >> configuration key 'jobmanager.rpc.address' instead >> > of key 'rest.address' >> > >> [flink-runner-job-invoker] INFO >> > >> org.apache.flink.runtime.rest.RestClient - Rest >> > client endpoint started. >> > >> [flink-runner-job-invoker] INFO >> > >> >> > org.apache.flink.client.program.rest.RestClusterClient - >> > Submitting >> > >> job 4e055a8878dda3f564a7b7c84d48510d (detached: >> false). >> > >> >> > >> Thanks, >> > >> Yu Watanabe >> > >> >> > >> On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver >> > <kcwea...@google.com <mailto:kcwea...@google.com> >> > >> <mailto:kcwea...@google.com >> > <mailto:kcwea...@google.com>>> wrote: >> > >> >> > >> Try adding "--experiments=beam_fn_api" to your >> > pipeline options. >> > >> (This is a known issue with Beam 2.15 that will >> > be fixed in 2.16.) >> > >> >> > >> Kyle Weaver | Software Engineer | >> > github.com/ibzib <http://github.com/ibzib> >> > >> <http://github.com/ibzib> | kcwea...@google.com >> > <mailto:kcwea...@google.com> >> > >> <mailto:kcwea...@google.com >> > <mailto:kcwea...@google.com>> >> > >> >> > >> >> > >> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe >> > >> <yu.w.ten...@gmail.com >> > <mailto:yu.w.ten...@gmail.com> <mailto: >> yu.w.ten...@gmail.com >> > <mailto:yu.w.ten...@gmail.com>>> wrote: >> > >> >> > >> Hello. >> > >> >> > >> I am trying to spin up the flink runner but >> > looks like data >> > >> serialization is failing. >> > >> I would like to ask for help to get over >> > with this error. >> > >> >> > >> >> > >> ======================================================================== >> > >> [flink-runner-job-invoker] ERROR >> > >> >> > >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation >> > >> - Error during job invocation >> > >> >> > >> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. >> > >> java.lang.IllegalArgumentException: unable >> > to deserialize >> > >> BoundedSource >> > >> at >> > >> >> > >> >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) >> > >> at >> > >> >> > >> >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) >> > >> at >> > >> >> > >> >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) >> > >> at >> > >> >> > >> >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) >> > >> at >> > >> >> > >> >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) >> > >> at >> > >> >> > >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) >> > >> ywatanabe@debian-09-00:~$ >> > >> at >> > >> >> > >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > >> at >> > java.lang.Thread.run(Thread.java:748) >> > >> Caused by: java.io.IOException: >> > FAILED_TO_UNCOMPRESS(5) >> > >> at >> > >> >> > >> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) >> > >> at >> > >> >> > org.xerial.snappy.SnappyNative.rawUncompress(Native >> Method) >> > >> at >> > >> >> > org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) >> > >> at >> > org.xerial.snappy.Snappy.uncompress(Snappy.java:513) >> > >> at >> > >> >> > >> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) >> > >> at >> > >> >> > >> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) >> > >> at >> > >> >> > >> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) >> > >> at >> > >> >> > >> >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) >> > >> ... 13 more >> > >> >> > >> ======================================================================== >> > >> >> > >> My beam version is below. >> > >> >> > >> >> > >> ======================================================================= >> > >> (python) ywatanabe@debian-09-00:~$ pip3 >> > freeze | grep >> > >> apache-beam >> > >> apache-beam==2.15.0 >> > >> >> > >> ======================================================================= >> > >> >> > >> I have my harness container ready on the >> > registry. >> > >> >> > >> >> > >> ======================================================================= >> > >> ywatanabe@debian-09-00:~$ docker search >> > >> ywatanabe-docker-apache.bintray.io/python3 >> > <http://ywatanabe-docker-apache.bintray.io/python3> >> > >> >> > <http://ywatanabe-docker-apache.bintray.io/python3> >> > >> NAME DESCRIPTION >> STARS >> > >> OFFICIAL AUTOMATED >> > >> beam/python3 0 >> > >> >> > >> ======================================================================= >> > >> >> > >> Flink is ready on separate cluster. >> > >> >> > >> >> > >> ======================================================================= >> > >> (python) ywatanabe@debian-09-00:~$ ss >> -atunp >> > | grep 8081 >> > >> tcp LISTEN 0 128 :::8081 >> > :::* >> > >> >> > >> ======================================================================= >> > >> >> > >> >> > >> My debian version. >> > >> >> > >> >> > >> ======================================================================= >> > >> >> > >> (python) ywatanabe@debian-09-00:~$ cat >> > /etc/debian_version >> > >> 9.11 >> > >> >> > >> ======================================================================= >> > >> >> > >> >> > >> My code snippet is below. >> > >> >> > >> >> > >> ======================================================================= >> > >> >> > >> options = PipelineOptions([ >> > >> "--runner=FlinkRunner", >> > >> "--flink_version=1.8", >> > >> "--flink_master_url=localhost:8081" >> > >> ]) >> > >> >> > >> with beam.Pipeline(options=options) as >> p: >> > >> >> > >> (p | beam.Create(["Hello World"])) >> > >> >> > >> ======================================================================= >> > >> >> > >> >> > >> Would there be any other settings should I >> > look for ? >> > >> >> > >> Thanks, >> > >> Yu Watanabe >> > >> >> > >> -- >> > >> Yu Watanabe >> > >> Weekend Freelancer who loves to challenge >> > building data >> > >> platform >> > >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com> >> > <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com >> >> >> > >> LinkedIn icon >> > <https://www.linkedin.com/in/yuwatanabe1> >> > >> Twitter icon <https://twitter.com/yuwtennis >> > >> > >> >> > >> >> > >> >> > >> -- >> > >> Yu Watanabe >> > >> Weekend Freelancer who loves to challenge building >> > data platform >> > >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com> >> > <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com >> >> >> > >> LinkedIn icon >> > <https://www.linkedin.com/in/yuwatanabe1> Twitter icon >> > >> <https://twitter.com/yuwtennis> >> > >> >> > >> >