I believe the flag was never relevant for PortableRunner. I might be
wrong as well. The flag affects a few bits in the core code and that is
why the solution cannot be by just setting the flag in Dataflow runner.
It requires some amount of clean up. I agree that it would be good to
clean this up, and I also agree to not rush this especially if this is
not currently impacting users.
Ahmet
On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
> I disagree that this flag is obsolete. It is still serving a
purpose for batch users using dataflow runner and that is decent
chunk of beam python users.
It is obsolete for the PortableRunner. If the Dataflow Runner needs
this
flag, couldn't we simply add it there? As far as I know Dataflow users
do not use the PortableRunner. I might be wrong.
As Kyle mentioned, he already fixed the issue. The fix is only present
in the 2.16.0 release though. This flag has repeatedly caused friction
for users and that's why I want to get rid of it.
There is of course no need to rush this but it would be great to tackle
this for the next release. Filed a JIRA:
https://jira.apache.org/jira/browse/BEAM-8274
Cheers,
Max
On 17.09.19 15:39, Kyle Weaver wrote:
> Actually, the reported issues are already fixed on head. We're just
> trying to prevent similar issues in the future.
>
> Kyle Weaver | Software Engineer | github.com/ibzib
<http://github.com/ibzib>
> <http://github.com/ibzib> | kcwea...@google.com
<mailto:kcwea...@google.com> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>>
>
>
> On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com
<mailto:al...@google.com>
> <mailto:al...@google.com <mailto:al...@google.com>>> wrote:
>
>
>
> On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> > Is not this flag set automatically for the portable runner
>
> Yes, the flag is set automatically, but it has been broken
> before and
> likely will be again. It just adds additional complexity to
> portable
> Runners. There is no other portability API then the Fn
API. This
> flag
> historically had its justification, but seems obsolete now.
>
>
> I disagree that this flag is obsolete. It is still serving a
purpose
> for batch users using dataflow runner and that is decent chunk of
> beam python users.
>
> I agree with switching the default. I would like to give
enough time
> to decouple the flag from the core code. (With a quick search
I saw
> two instances related to Read and Create.) Have time to test
changes
> and then switch the default.
>
>
> An isinstance check might be smarter, but does not get rid of
> the root
> of the problem.
>
>
> I might be wrong, IIUC, it will temporarily resolve the reported
> issues. Is this not accurate?
>
>
> -Max
>
> On 17.09.19 14:20, Ahmet Altay wrote:
> > Could you make that change and see if it would have
addressed
> the issue
> > here?
> >
> > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
> <kcwea...@google.com <mailto:kcwea...@google.com>
<mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> > <mailto:kcwea...@google.com
<mailto:kcwea...@google.com> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>>>> wrote:
> >
> > The flag is automatically set, but not in a smart
way. Taking
> > another look at the code, a more resilient fix
would be
> to just
> > check if the runner isinstance of PortableRunner.
> >
> > Kyle Weaver | Software Engineer | github.com/ibzib
<http://github.com/ibzib>
> <http://github.com/ibzib>
> > <http://github.com/ibzib> | kcwea...@google.com
<mailto:kcwea...@google.com>
> <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> > <mailto:kcwea...@google.com
<mailto:kcwea...@google.com> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>>>
> >
> >
> > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
> <al...@google.com <mailto:al...@google.com>
<mailto:al...@google.com <mailto:al...@google.com>>
> > <mailto:al...@google.com <mailto:al...@google.com>
<mailto:al...@google.com <mailto:al...@google.com>>>> wrote:
> >
> > Is not this flag set automatically for the
portable
> runner here
> > [1] ?
> >
> > [1]
> >
>
https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
> >
> > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
> > <rober...@google.com
<mailto:rober...@google.com> <mailto:rober...@google.com
<mailto:rober...@google.com>>
> <mailto:rober...@google.com <mailto:rober...@google.com>
<mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >
> > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
> <t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>
> > <mailto:t...@apache.org
<mailto:t...@apache.org> <mailto:t...@apache.org
<mailto:t...@apache.org>>>>
> wrote:
> > >
> > > +1 for making --experiments=beam_fn_api
default.
> > >
> > > Can the Dataflow runner driver just
remove the
> setting if
> > it is not compatible?
> >
> > The tricky bit would be undoing the
differences
> in graph
> > construction
> > due to this flag flip. But I would be in
favor of
> changing
> > the default
> > (probably just removing the flag) and
moving the
> > non-portability parts
> > into the dataflow runner itself. (It looks
like
> the key
> > differences
> > here are for the Create and Read transforms.)
> >
> > > On Tue, Sep 17, 2019 at 11:33 AM Maximilian
> Michels
> > <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> > >>
> > >> +dev
> > >>
> > >> The beam_fn_api flag and the way it is
> automatically set
> > is error-prone.
> > >> Is there anything that prevents us from
> removing it? I
> > understand that
> > >> some Runners, e.g. Dataflow Runner
have two
> modes of
> > executing Python
> > >> pipelines (legacy and portable), but
at this
> point it
> > seems clear that
> > >> the portability mode should be the
default.
> > >>
> > >> Cheers,
> > >> Max
> > >>
> > >> On September 14, 2019 7:50:52 PM PDT,
Yu Watanabe
> > >> <yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>>>
> > wrote:
> > >>
> > >> Kyle
> > >>
> > >> Thank you for the assistance.
> > >>
> > >> By specifying "experiments" in
> PipelineOptions ,
> > >>
==========================================
> > >> options = PipelineOptions([
> > >>
> "--runner=FlinkRunner",
> > >>
"--flink_version=1.8",
> > >>
> > "--flink_master_url=localhost:8081",
> > >>
> "--experiments=beam_fn_api"
> > >> ])
> > >>
==========================================
> > >>
> > >> I was able to submit the job
successfully.
> > >>
> > >> [grpc-default-executor-0] INFO
> > >>
> org.apache.beam.runners.flink.FlinkJobInvoker -
> > Invoking job
> > >>
> >
>
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
> > >> [grpc-default-executor-0] INFO
> > >>
> >
>
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
> > >> Starting job invocation
> > >>
> >
>
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
> > >> [flink-runner-job-invoker] INFO
> > >>
> org.apache.beam.runners.flink.FlinkPipelineRunner -
> > Translating
> > >> pipeline to Flink program.
> > >> [flink-runner-job-invoker] INFO
> > >>
> >
> org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> > Creating
> > >> a Batch Execution Environment.
> > >> [flink-runner-job-invoker] INFO
> > >>
> >
> org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> > Using
> > >> Flink Master URL localhost:8081.
> > >> [flink-runner-job-invoker] WARN
> > >>
> >
>
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
> > >> default parallelism could be found.
> Defaulting to
> > parallelism 1.
> > >> Please set an explicit parallelism
with
> --parallelism
> > >> [flink-runner-job-invoker] INFO
> > >>
> org.apache.flink.api.java.ExecutionEnvironment - The
> > job has 0
> > >> registered types and 0 default Kryo
> serializers
> > >> [flink-runner-job-invoker] INFO
> > >>
> org.apache.flink.configuration.Configuration -
> > Config uses fallback
> > >> configuration key
> 'jobmanager.rpc.address' instead
> > of key 'rest.address'
> > >> [flink-runner-job-invoker] INFO
> > >>
org.apache.flink.runtime.rest.RestClient
> - Rest
> > client endpoint started.
> > >> [flink-runner-job-invoker] INFO
> > >>
> >
> org.apache.flink.client.program.rest.RestClusterClient -
> > Submitting
> > >> job 4e055a8878dda3f564a7b7c84d48510d
> (detached: false).
> > >>
> > >> Thanks,
> > >> Yu Watanabe
> > >>
> > >> On Sun, Sep 15, 2019 at 3:01 AM
Kyle Weaver
> > <kcwea...@google.com
<mailto:kcwea...@google.com> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>>
> <mailto:kcwea...@google.com <mailto:kcwea...@google.com>
<mailto:kcwea...@google.com <mailto:kcwea...@google.com>>>
> > >> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>
> <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> > <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>
> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>>>>> wrote:
> > >>
> > >> Try adding
> "--experiments=beam_fn_api" to your
> > pipeline options.
> > >> (This is a known issue with
Beam 2.15
> that will
> > be fixed in 2.16.)
> > >>
> > >> Kyle Weaver | Software Engineer |
> > github.com/ibzib <http://github.com/ibzib>
<http://github.com/ibzib>
> <http://github.com/ibzib>
> > >> <http://github.com/ibzib> |
> kcwea...@google.com <mailto:kcwea...@google.com>
<mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> > <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>
> <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>>
> > >> <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>
> <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> > <mailto:kcwea...@google.com
<mailto:kcwea...@google.com>
> <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>>>
> > >>
> > >>
> > >> On Sat, Sep 14, 2019 at 12:52
AM Yu
> Watanabe
> > >> <yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
> > <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
> > <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>>>> wrote:
> > >>
> > >> Hello.
> > >>
> > >> I am trying to spin up the
flink
> runner but
> > looks like data
> > >> serialization is failing.
> > >> I would like to ask for
help to
> get over
> > with this error.
> > >>
> > >>
> >
>
========================================================================
> > >> [flink-runner-job-invoker]
ERROR
> > >>
> >
>
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
> > >> - Error during job invocation
> > >>
> >
>
BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
> > >>
> java.lang.IllegalArgumentException: unable
> > to deserialize
> > >> BoundedSource
> > >> at
> > >>
> >
>
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> > >> at
> > >>
> >
>
org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
> > >> at
> > >>
> >
>
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
> > >> at
> > >>
> >
>
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
> > >> at
> > >>
> >
>
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
> > >> at
> > >>
> >
>
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
> > >> at
> > >>
> >
>
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
> > >> at
> > >>
> >
>
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
> > >> at
> > >>
> >
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> > >> at
> > >>
> >
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> > >> at
> > >>
> >
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> > >> at
> > >>
> >
>
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
> > >> ywatanabe@debian-09-00:~$
> > >> at
> > >>
> >
>
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > >> at
> > java.lang.Thread.run(Thread.java:748)
> > >> Caused by:
java.io.IOException:
> > FAILED_TO_UNCOMPRESS(5)
> > >> at
> > >>
> >
>
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
> > >> at
> > >>
> >
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> > >> at
> > >>
> >
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
> > >> at
> >
org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
> > >> at
> > >>
> >
>
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
> > >> at
> > >>
> >
>
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
> > >> at
> > >>
> >
>
org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
> > >> at
> > >>
> >
>
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> > >> ... 13 more
> > >>
> >
>
========================================================================
> > >>
> > >> My beam version is below.
> > >>
> > >>
> >
>
=======================================================================
> > >> (python)
> ywatanabe@debian-09-00:~$ pip3
> > freeze | grep
> > >> apache-beam
> > >> apache-beam==2.15.0
> > >>
> >
>
=======================================================================
> > >>
> > >> I have my harness
container ready
> on the
> > registry.
> > >>
> > >>
> >
>
=======================================================================
> > >> ywatanabe@debian-09-00:~$
docker
> search
> > >>
ywatanabe-docker-apache.bintray.io/python3
<http://ywatanabe-docker-apache.bintray.io/python3>
> <http://ywatanabe-docker-apache.bintray.io/python3>
> >
<http://ywatanabe-docker-apache.bintray.io/python3>
> > >>
> >
<http://ywatanabe-docker-apache.bintray.io/python3>
> > >> NAME
DESCRIPTION
> STARS
> > >> OFFICIAL AUTOMATED
> > >> beam/python3
> 0
> > >>
> >
>
=======================================================================
> > >>
> > >> Flink is ready on separate
cluster.
> > >>
> > >>
> >
>
=======================================================================
> > >> (python)
> ywatanabe@debian-09-00:~$ ss -atunp
> > | grep 8081
> > >> tcp LISTEN 0 128
> :::8081
> > :::*
> > >>
> >
>
=======================================================================
> > >>
> > >>
> > >> My debian version.
> > >>
> > >>
> >
>
=======================================================================
> > >>
> > >> (python)
> ywatanabe@debian-09-00:~$ cat
> > /etc/debian_version
> > >> 9.11
> > >>
> >
>
=======================================================================
> > >>
> > >>
> > >> My code snippet is below.
> > >>
> > >>
> >
>
=======================================================================
> > >>
> > >> options =
PipelineOptions([
> > >>
> "--runner=FlinkRunner",
> > >>
> "--flink_version=1.8",
> > >>
"--flink_master_url=localhost:8081"
> > >> ])
> > >>
> > >> with
> beam.Pipeline(options=options) as p:
> > >>
> > >> (p |
beam.Create(["Hello
> World"]))
> > >>
> >
>
=======================================================================
> > >>
> > >>
> > >> Would there be any other
settings
> should I
> > look for ?
> > >>
> > >> Thanks,
> > >> Yu Watanabe
> > >>
> > >> --
> > >> Yu Watanabe
> > >> Weekend Freelancer who
loves to
> challenge
> > building data
> > >> platform
> > >> yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>>
> > <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>>>
> > >> LinkedIn icon
> > <https://www.linkedin.com/in/yuwatanabe1>
> > >> Twitter icon
> <https://twitter.com/yuwtennis>
> > >>
> > >>
> > >>
> > >> --
> > >> Yu Watanabe
> > >> Weekend Freelancer who loves to
challenge
> building
> > data platform
> > >> yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>>
> > <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>
> <mailto:yu.w.ten...@gmail.com
<mailto:yu.w.ten...@gmail.com>>>>
> > >> LinkedIn icon
> > <https://www.linkedin.com/in/yuwatanabe1>
Twitter
> icon
> > >> <https://twitter.com/yuwtennis>
> > >>
> >
>