I believe the flag was never relevant for PortableRunner. I might be wrong as well. The flag affects a few bits in the core code and that is why the solution cannot be by just setting the flag in Dataflow runner. It requires some amount of clean up. I agree that it would be good to clean this up, and I also agree to not rush this especially if this is not currently impacting users.
Ahmet On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels <m...@apache.org> wrote: > > I disagree that this flag is obsolete. It is still serving a purpose for > batch users using dataflow runner and that is decent chunk of beam python > users. > > It is obsolete for the PortableRunner. If the Dataflow Runner needs this > flag, couldn't we simply add it there? As far as I know Dataflow users > do not use the PortableRunner. I might be wrong. > > As Kyle mentioned, he already fixed the issue. The fix is only present > in the 2.16.0 release though. This flag has repeatedly caused friction > for users and that's why I want to get rid of it. > > There is of course no need to rush this but it would be great to tackle > this for the next release. Filed a JIRA: > https://jira.apache.org/jira/browse/BEAM-8274 > > Cheers, > Max > > On 17.09.19 15:39, Kyle Weaver wrote: > > Actually, the reported issues are already fixed on head. We're just > > trying to prevent similar issues in the future. > > > > Kyle Weaver | Software Engineer | github.com/ibzib > > <http://github.com/ibzib> | kcwea...@google.com <mailto: > kcwea...@google.com> > > > > > > On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com > > <mailto:al...@google.com>> wrote: > > > > > > > > On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > > Is not this flag set automatically for the portable runner > > > > Yes, the flag is set automatically, but it has been broken > > before and > > likely will be again. It just adds additional complexity to > > portable > > Runners. There is no other portability API then the Fn API. This > > flag > > historically had its justification, but seems obsolete now. > > > > > > I disagree that this flag is obsolete. It is still serving a purpose > > for batch users using dataflow runner and that is decent chunk of > > beam python users. > > > > I agree with switching the default. I would like to give enough time > > to decouple the flag from the core code. (With a quick search I saw > > two instances related to Read and Create.) Have time to test changes > > and then switch the default. > > > > > > An isinstance check might be smarter, but does not get rid of > > the root > > of the problem. > > > > > > I might be wrong, IIUC, it will temporarily resolve the reported > > issues. Is this not accurate? > > > > > > -Max > > > > On 17.09.19 14:20, Ahmet Altay wrote: > > > Could you make that change and see if it would have addressed > > the issue > > > here? > > > > > > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver > > <kcwea...@google.com <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>> > wrote: > > > > > > The flag is automatically set, but not in a smart way. > Taking > > > another look at the code, a more resilient fix would be > > to just > > > check if the runner isinstance of PortableRunner. > > > > > > Kyle Weaver | Software Engineer | github.com/ibzib > > <http://github.com/ibzib> > > > <http://github.com/ibzib> | kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > > > > > > > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay > > <al...@google.com <mailto:al...@google.com> > > > <mailto:al...@google.com <mailto:al...@google.com>>> > wrote: > > > > > > Is not this flag set automatically for the portable > > runner here > > > [1] ? > > > > > > [1] > > > > > > https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160 > > > > > > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw > > > <rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > wrote: > > > > > > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise > > <t...@apache.org <mailto:t...@apache.org> > > > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > wrote: > > > > > > > > +1 for making --experiments=beam_fn_api > default. > > > > > > > > Can the Dataflow runner driver just remove the > > setting if > > > it is not compatible? > > > > > > The tricky bit would be undoing the differences > > in graph > > > construction > > > due to this flag flip. But I would be in favor of > > changing > > > the default > > > (probably just removing the flag) and moving the > > > non-portability parts > > > into the dataflow runner itself. (It looks like > > the key > > > differences > > > here are for the Create and Read transforms.) > > > > > > > On Tue, Sep 17, 2019 at 11:33 AM Maximilian > > Michels > > > <m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > >> > > > >> +dev > > > >> > > > >> The beam_fn_api flag and the way it is > > automatically set > > > is error-prone. > > > >> Is there anything that prevents us from > > removing it? I > > > understand that > > > >> some Runners, e.g. Dataflow Runner have two > > modes of > > > executing Python > > > >> pipelines (legacy and portable), but at this > > point it > > > seems clear that > > > >> the portability mode should be the default. > > > >> > > > >> Cheers, > > > >> Max > > > >> > > > >> On September 14, 2019 7:50:52 PM PDT, Yu > Watanabe > > > >> <yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> > > > wrote: > > > >> > > > >> Kyle > > > >> > > > >> Thank you for the assistance. > > > >> > > > >> By specifying "experiments" in > > PipelineOptions , > > > >> ========================================== > > > >> options = PipelineOptions([ > > > >> > > "--runner=FlinkRunner", > > > >> > "--flink_version=1.8", > > > >> > > > "--flink_master_url=localhost:8081", > > > >> > > "--experiments=beam_fn_api" > > > >> ]) > > > >> ========================================== > > > >> > > > >> I was able to submit the job successfully. > > > >> > > > >> [grpc-default-executor-0] INFO > > > >> > > org.apache.beam.runners.flink.FlinkJobInvoker - > > > Invoking job > > > >> > > > > > > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > > > >> [grpc-default-executor-0] INFO > > > >> > > > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - > > > >> Starting job invocation > > > >> > > > > > > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > > > >> [flink-runner-job-invoker] INFO > > > >> > > org.apache.beam.runners.flink.FlinkPipelineRunner - > > > Translating > > > >> pipeline to Flink program. > > > >> [flink-runner-job-invoker] INFO > > > >> > > > > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > > > Creating > > > >> a Batch Execution Environment. > > > >> [flink-runner-job-invoker] INFO > > > >> > > > > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > > > Using > > > >> Flink Master URL localhost:8081. > > > >> [flink-runner-job-invoker] WARN > > > >> > > > > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - No > > > >> default parallelism could be found. > > Defaulting to > > > parallelism 1. > > > >> Please set an explicit parallelism with > > --parallelism > > > >> [flink-runner-job-invoker] INFO > > > >> > > org.apache.flink.api.java.ExecutionEnvironment - The > > > job has 0 > > > >> registered types and 0 default Kryo > > serializers > > > >> [flink-runner-job-invoker] INFO > > > >> > > org.apache.flink.configuration.Configuration - > > > Config uses fallback > > > >> configuration key > > 'jobmanager.rpc.address' instead > > > of key 'rest.address' > > > >> [flink-runner-job-invoker] INFO > > > >> org.apache.flink.runtime.rest.RestClient > > - Rest > > > client endpoint started. > > > >> [flink-runner-job-invoker] INFO > > > >> > > > > > org.apache.flink.client.program.rest.RestClusterClient - > > > Submitting > > > >> job 4e055a8878dda3f564a7b7c84d48510d > > (detached: false). > > > >> > > > >> Thanks, > > > >> Yu Watanabe > > > >> > > > >> On Sun, Sep 15, 2019 at 3:01 AM Kyle > Weaver > > > <kcwea...@google.com <mailto:kcwea...@google.com> > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > >> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>>>> wrote: > > > >> > > > >> Try adding > > "--experiments=beam_fn_api" to your > > > pipeline options. > > > >> (This is a known issue with Beam 2.15 > > that will > > > be fixed in 2.16.) > > > >> > > > >> Kyle Weaver | Software Engineer | > > > github.com/ibzib <http://github.com/ibzib> > > <http://github.com/ibzib> > > > >> <http://github.com/ibzib> | > > kcwea...@google.com <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>> > > > >> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>>> > > > >> > > > >> > > > >> On Sat, Sep 14, 2019 at 12:52 AM Yu > > Watanabe > > > >> <yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>>> wrote: > > > >> > > > >> Hello. > > > >> > > > >> I am trying to spin up the flink > > runner but > > > looks like data > > > >> serialization is failing. > > > >> I would like to ask for help to > > get over > > > with this error. > > > >> > > > >> > > > > > > ======================================================================== > > > >> [flink-runner-job-invoker] ERROR > > > >> > > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation > > > >> - Error during job invocation > > > >> > > > > > > BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. > > > >> > > java.lang.IllegalArgumentException: unable > > > to deserialize > > > >> BoundedSource > > > >> at > > > >> > > > > > > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) > > > >> at > > > >> > > > > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) > > > >> at > > > >> > > > > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > > > >> at > > > >> > > > > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > > > >> at > > > >> > > > > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > > > >> at > > > >> > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) > > > >> ywatanabe@debian-09-00:~$ > > > >> at > > > >> > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > >> at > > > java.lang.Thread.run(Thread.java:748) > > > >> Caused by: java.io.IOException: > > > FAILED_TO_UNCOMPRESS(5) > > > >> at > > > >> > > > > > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > > > >> at > > > >> > > > > > org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > > > >> at > > > >> > > > > > org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > > > >> at > > > > org.xerial.snappy.Snappy.uncompress(Snappy.java:513) > > > >> at > > > >> > > > > > > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) > > > >> at > > > >> > > > > > > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) > > > >> at > > > >> > > > > > > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > > > >> at > > > >> > > > > > > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) > > > >> ... 13 more > > > >> > > > > > > ======================================================================== > > > >> > > > >> My beam version is below. > > > >> > > > >> > > > > > > ======================================================================= > > > >> (python) > > ywatanabe@debian-09-00:~$ pip3 > > > freeze | grep > > > >> apache-beam > > > >> apache-beam==2.15.0 > > > >> > > > > > > ======================================================================= > > > >> > > > >> I have my harness container ready > > on the > > > registry. > > > >> > > > >> > > > > > > ======================================================================= > > > >> ywatanabe@debian-09-00:~$ docker > > search > > > >> ywatanabe-docker-apache.bintray.io/python3 > > <http://ywatanabe-docker-apache.bintray.io/python3> > > > < > http://ywatanabe-docker-apache.bintray.io/python3> > > > >> > > > < > http://ywatanabe-docker-apache.bintray.io/python3> > > > >> NAME DESCRIPTION > > STARS > > > >> OFFICIAL AUTOMATED > > > >> beam/python3 > > 0 > > > >> > > > > > > ======================================================================= > > > >> > > > >> Flink is ready on separate > cluster. > > > >> > > > >> > > > > > > ======================================================================= > > > >> (python) > > ywatanabe@debian-09-00:~$ ss -atunp > > > | grep 8081 > > > >> tcp LISTEN 0 128 > > :::8081 > > > :::* > > > >> > > > > > > ======================================================================= > > > >> > > > >> > > > >> My debian version. > > > >> > > > >> > > > > > > ======================================================================= > > > >> > > > >> (python) > > ywatanabe@debian-09-00:~$ cat > > > /etc/debian_version > > > >> 9.11 > > > >> > > > > > > ======================================================================= > > > >> > > > >> > > > >> My code snippet is below. > > > >> > > > >> > > > > > > ======================================================================= > > > >> > > > >> options = PipelineOptions([ > > > >> > > "--runner=FlinkRunner", > > > >> > > "--flink_version=1.8", > > > >> > "--flink_master_url=localhost:8081" > > > >> ]) > > > >> > > > >> with > > beam.Pipeline(options=options) as p: > > > >> > > > >> (p | beam.Create(["Hello > > World"])) > > > >> > > > > > > ======================================================================= > > > >> > > > >> > > > >> Would there be any other settings > > should I > > > look for ? > > > >> > > > >> Thanks, > > > >> Yu Watanabe > > > >> > > > >> -- > > > >> Yu Watanabe > > > >> Weekend Freelancer who loves to > > challenge > > > building data > > > >> platform > > > >> yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> > > > >> LinkedIn icon > > > <https://www.linkedin.com/in/yuwatanabe1> > > > >> Twitter icon > > <https://twitter.com/yuwtennis> > > > >> > > > >> > > > >> > > > >> -- > > > >> Yu Watanabe > > > >> Weekend Freelancer who loves to challenge > > building > > > data platform > > > >> yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> > > > >> LinkedIn icon > > > <https://www.linkedin.com/in/yuwatanabe1> Twitter > > icon > > > >> <https://twitter.com/yuwtennis> > > > >> > > > > > >