On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org> wrote:

> Ah, I thought you meant native Flink transforms.
>
> Exactly! The translation code is already there. The main challenge is how
> to
> programmatically configure the BeamIO from Python. I suppose that is also
> an
> unsolved problem for cross-language transforms in general.
>

This is what https://github.com/apache/beam/pull/7316 does.

For a particular source, one would want to define a URN and corresponding
payload, then (probably) a CompositeTransform in Python that takes the
users arguments, packages them into the payload, applies the
ExternalTransform, and returns the results. How to handle arbitrary UDFs
embedded in sources is still TBD.


> For Matthias' pipeline with PubSubIO we can build something specific, but
> for
> the general case there should be way to initialize a Beam IO via a
> configuration
> map provided by an external environment.
>

I thought quite a bit about how we could represent expansions statically
(e.g. have some kind of expansion template that could be used, at least in
many cases, as data without firing up a separate process. May be worth
doing eventually, but we run into the same issues that were discussed at
https://github.com/apache/beam/pull/7316#discussion_r249996455 ).

If one is already using a portable runner like Flink, having the job
service process automatically also serve up an expansion service for
various URNs it knows and cares about is probably a pretty low bar. Flink
could serve up things it would rather get back untouched in a transform
with a special flink runner urn.

As Ahmet mentions, SDF is better solution. I hope it's not that far away,
but even once it comes we'll likely want the above framework to invoke the
full suite of Java IOs even after they're running on SDF themselves.

- Robert



> On 31.01.19 17:36, Thomas Weise wrote:
> > Exactly, that's what I had in mind.
> >
> > A Flink runner native transform would make the existing unbounded
> sources
> > available, similar to:
> >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> >
> >
> >
> >
> > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels <m...@apache.org
> > <mailto:m...@apache.org>> wrote:
> >
> >     Wouldn't it be even more useful for the transition period if we
> enabled Beam IO
> >     to be used via Flink (like in the legacy Flink Runner)? In this
> particular
> >     example, Matthias wants to use PubSubIO, which is not even available
> as a
> >     native
> >     Flink transform.
> >
> >     On 31.01.19 16:21, Thomas Weise wrote:
> >      > Until SDF is supported, we could also add Flink runner native
> transforms for
> >      > selected unbounded sources [1].
> >      >
> >      > That might be a reasonable option to unblock users that want to
> try Python
> >      > streaming on Flink.
> >      >
> >      > Thomas
> >      >
> >      > [1]
> >      >
> >
> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> >      >
> >      >
> >      > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels <
> m...@apache.org
> >     <mailto:m...@apache.org>
> >      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >      >
> >      >      > I have a hard time to imagine how can we map in a generic
> way
> >      >     RestrictionTrackers into the existing
> Bounded/UnboundedSource, so I would
> >      >     love to hear more about the details.
> >      >
> >      >     Isn't it the other way around? The SDF is a generalization of
> >     UnboundedSource.
> >      >     So we would wrap UnboundedSource using SDF. I'm not saying it
> is
> >     trivial, but
> >      >     SDF offers all the functionality that UnboundedSource needs.
> >      >
> >      >     For example, the @GetInitialRestriction method would call
> split on the
> >      >     UnboundedSource and the restriction trackers would then be
> used to
> >     process the
> >      >     splits.
> >      >
> >      >     On 31.01.19 15:16, Ismaël Mejía wrote:
> >      >      >> Not necessarily. This would be one way. Another way is
> build an SDF
> >      >     wrapper for UnboundedSource. Probably the easier path for
> migration.
> >      >      >
> >      >      > That would be fantastic, I have heard about such wrapper
> multiple
> >      >      > times but so far there is not any realistic proposal. I
> have a hard
> >      >      > time to imagine how can we map in a generic way
> RestrictionTrackers
> >      >      > into the existing Bounded/UnboundedSource, so I would love
> to hear
> >      >      > more about the details.
> >      >      >
> >      >      > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels <
> m...@apache.org
> >     <mailto:m...@apache.org>
> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >      >      >>
> >      >      >>   > In addition to have support in the runners, this will
> require a
> >      >      >>   > rewrite of PubsubIO to use the new SDF API.
> >      >      >>
> >      >      >> Not necessarily. This would be one way. Another way is
> build an SDF
> >      >     wrapper for
> >      >      >> UnboundedSource. Probably the easier path for migration.
> >      >      >>
> >      >      >> On 31.01.19 14:03, Ismaël Mejía wrote:
> >      >      >>>> Fortunately, there is already a pending PR for
> cross-language
> >      >     pipelines which
> >      >      >>>> will allow us to use Java IO like PubSub in Python jobs.
> >      >      >>>
> >      >      >>> In addition to have support in the runners, this will
> require a
> >      >      >>> rewrite of PubsubIO to use the new SDF API.
> >      >      >>>
> >      >      >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels
> >     <m...@apache.org <mailto:m...@apache.org>
> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >      >      >>>>
> >      >      >>>> Hi Matthias,
> >      >      >>>>
> >      >      >>>> This is already reflected in the compatibility matrix,
> if you look
> >      >     under SDF.
> >      >      >>>> There is no UnboundedSource interface for portable
> pipelines.
> >     That's a
> >      >     legacy
> >      >      >>>> abstraction that will be replaced with SDF.
> >      >      >>>>
> >      >      >>>> Fortunately, there is already a pending PR for
> cross-language
> >      >     pipelines which
> >      >      >>>> will allow us to use Java IO like PubSub in Python jobs.
> >      >      >>>>
> >      >      >>>> Thanks,
> >      >      >>>> Max
> >      >      >>>>
> >      >      >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> >      >      >>>>> Hey Ankur,
> >      >      >>>>>
> >      >      >>>>> Thanks for the swift reply. Should I change this in the
> >     capability matrix
> >      >      >>>>> <
> https://s.apache.org/apache-beam-portability-support-table> then?
> >      >      >>>>>
> >      >      >>>>> Many thanks.
> >      >      >>>>> Best,
> >      >      >>>>> Matthias
> >      >      >>>>>
> >      >      >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka <
> goe...@google.com
> >     <mailto:goe...@google.com>
> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>>
> >      >      >>>>> <mailto:goe...@google.com <mailto:goe...@google.com>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>>>> wrote:
> >      >      >>>>>
> >      >      >>>>>       Hi Matthias,
> >      >      >>>>>
> >      >      >>>>>       Unfortunately, unbounded reads including pubsub
> are not yet
> >      >     supported for
> >      >      >>>>>       portable runners.
> >      >      >>>>>
> >      >      >>>>>       Thanks,
> >      >      >>>>>       Ankur
> >      >      >>>>>
> >      >      >>>>>       On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens
> >      >     <baetensmatth...@gmail.com <mailto:baetensmatth...@gmail.com>
> >     <mailto:baetensmatth...@gmail.com <mailto:baetensmatth...@gmail.com
> >>
> >      >      >>>>>       <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>>> wrote:
> >      >      >>>>>
> >      >      >>>>>           Hi everyone,
> >      >      >>>>>
> >      >      >>>>>           Last few days I have been trying to run a
> streaming
> >      >     pipeline (code on
> >      >      >>>>>           Github <
> https://github.com/matthiasa4/beam-demo>) on a
> >      >     Flink Runner.
> >      >      >>>>>
> >      >      >>>>>           I am running a Flink cluster locally (v1.5.6
> >      >      >>>>>           <https://flink.apache.org/downloads.html>)
> >      >      >>>>>           I have built the SDK Harness Container:
> /./gradlew
> >      >      >>>>>           :beam-sdks-python-container:docker/
> >      >      >>>>>           and started the JobServer: /./gradlew
> >      >      >>>>>           :beam-runners-flink_2.11-job-server:runShadow
> >      >      >>>>>           -PflinkMasterUrl=localhost:8081./
> >      >      >>>>>
> >      >      >>>>>           I run my pipeline with: /env/bin/python
> >     streaming_pipeline.py
> >      >      >>>>>           --runner=PortableRunner
> --job_endpoint=localhost:8099
> >      >     --output xxx
> >      >      >>>>>           --input_subscription xxx
> --output_subscription xxx/
> >      >      >>>>>           /
> >      >      >>>>>           /
> >      >      >>>>>           All this is running inside a Ubuntu (Bionic)
> in a
> >     Virtualbox.
> >      >      >>>>>
> >      >      >>>>>           The job submits fine, but unfortunately
> fails after
> >     a few
> >      >     seconds with
> >      >      >>>>>           the error attached.
> >      >      >>>>>
> >      >      >>>>>           Anything I am missing or doing wrong?
> >      >      >>>>>
> >      >      >>>>>           Many thanks.
> >      >      >>>>>           Best,
> >      >      >>>>>           Matthias
> >      >      >>>>>
> >      >      >>>>>
> >      >
> >
>

Reply via email to