On Fri, Feb 1, 2019 at 6:12 AM Maximilian Michels <m...@apache.org> wrote:

> Yes, I imagine sources to implement a JsonConfigurable interface (e.g.
> on their builders):
>
> JsonConfigurable {
>    // Either a json string or Map<String, String>
>    apply(String jsonConfig);
> }
>
> In Python we would create this transform:
>
> URN: JsonConfiguredSource:v1
> payload: {
>     environment: environment_id, // Java/Python/Go
>     resourceIdentifier: string,  // "org.apache.beam.io.PubSubIO"
>     configuration: json config,  // { "topic" : "my_pubsub_topic" }
> }
>
> That's more generic and could be used for other languages where we might
> have sources/sinks.
>

Looks good!

I imagine there would be a wrapper for this similar to:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py#L33


> > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
> protos that fully specified it, and we use that same format to translate
> back out.)
>
> Not that I know of.
>
> On 01.02.19 14:02, Robert Bradshaw wrote:
> > Are you suggesting something akin to a generic
> >
> >      urn: JsonConfiguredJavaSource
> >      payload: some json specifying which source and which parameters
> >
> > which would expand to actually constructing and applying that source?
> >
> > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
> > protos that fully specified it, and we use that same format to translate
> > back out.)
> >
> > On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <m...@apache.org
> > <mailto:m...@apache.org>> wrote:
> >
> >     Recaping here:
> >
> >     We all agree that SDF is the way to go for future implementations of
> >     sources. It enables us to get rid of the source interfaces. However,
> >     SDF
> >     does not solve the lack of streaming sources in Python.
> >
> >     The expansion PR (thanks btw!) solves the problem of
> >     expanding/translating URNs known to an ExpansionService. That is a
> more
> >     programmatic way of replacing language-specific performs, instead of
> >     relying on translators directly in the Runner.
> >
> >     What is unsolved is the configuration of sources from a foreign
> >     environment. In my opinion this is the most pressing issue for Python
> >     sources, because what is PubSubIO worth in Python if you cannot
> >     configure it?
> >
> >     What about this:
> >
> >     I think it is worth adding a JSON configuration option for all
> existing
> >     Java sources. That way, we could easily configure them as part of the
> >     expansion request (which would contain a JSON configuration). I'll
> >     probably fork a thread to discuss this in more detail, but would
> >     like to
> >     hear your thoughts.
> >
> >     -Max
> >
> >     On 01.02.19 13:08, Robert Bradshaw wrote:
> >      > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels
> >     <m...@apache.org <mailto:m...@apache.org>
> >      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >      >
> >      >     Ah, I thought you meant native Flink transforms.
> >      >
> >      >     Exactly! The translation code is already there. The main
> >     challenge
> >      >     is how to
> >      >     programmatically configure the BeamIO from Python. I suppose
> >     that is
> >      >     also an
> >      >     unsolved problem for cross-language transforms in general.
> >      >
> >      >
> >      > This is what https://github.com/apache/beam/pull/7316 does.
> >      >
> >      > For a particular source, one would want to define a URN and
> >      > corresponding payload, then (probably) a CompositeTransform in
> >     Python
> >      > that takes the users arguments, packages them into the payload,
> >     applies
> >      > the ExternalTransform, and returns the results. How to handle
> >     arbitrary
> >      > UDFs embedded in sources is still TBD.
> >      >
> >      >     For Matthias' pipeline with PubSubIO we can build something
> >      >     specific, but for
> >      >     the general case there should be way to initialize a Beam IO
> >     via a
> >      >     configuration
> >      >     map provided by an external environment.
> >      >
> >      >
> >      > I thought quite a bit about how we could represent expansions
> >     statically
> >      > (e.g. have some kind of expansion template that could be used, at
> >     least
> >      > in many cases, as data without firing up a separate process. May
> be
> >      > worth doing eventually, but we run into the same issues that were
> >      > discussed at
> >      > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
> >      >
> >      > If one is already using a portable runner like Flink, having the
> job
> >      > service process automatically also serve up an expansion service
> for
> >      > various URNs it knows and cares about is probably a pretty low
> bar.
> >      > Flink could serve up things it would rather get back untouched in
> a
> >      > transform with a special flink runner urn.
> >      >
> >      > As Ahmet mentions, SDF is better solution. I hope it's not that
> far
> >      > away, but even once it comes we'll likely want the above
> >     framework to
> >      > invoke the full suite of Java IOs even after they're running on
> SDF
> >      > themselves.
> >      >
> >      > - Robert
> >      >
> >      >     On 31.01.19 17:36, Thomas Weise wrote:
> >      >      > Exactly, that's what I had in mind.
> >      >      >
> >      >      > A Flink runner native transform would make the existing
> >     unbounded
> >      >     sources
> >      >      > available, similar to:
> >      >      >
> >      >      >
> >      >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> >      >      >
> >      >      >
> >      >      >
> >      >      >
> >      >      > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
> >      >     <m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >      > <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >      >      >
> >      >      >     Wouldn't it be even more useful for the transition
> >     period if
> >      >     we enabled Beam IO
> >      >      >     to be used via Flink (like in the legacy Flink
> Runner)? In
> >      >     this particular
> >      >      >     example, Matthias wants to use PubSubIO, which is not
> even
> >      >     available as a
> >      >      >     native
> >      >      >     Flink transform.
> >      >      >
> >      >      >     On 31.01.19 16:21, Thomas Weise wrote:
> >      >      >      > Until SDF is supported, we could also add Flink
> runner
> >      >     native transforms for
> >      >      >      > selected unbounded sources [1].
> >      >      >      >
> >      >      >      > That might be a reasonable option to unblock users
> that
> >      >     want to try Python
> >      >      >      > streaming on Flink.
> >      >      >      >
> >      >      >      > Thomas
> >      >      >      >
> >      >      >      > [1]
> >      >      >      >
> >      >      >
> >      >
> >
> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> >      >      >      >
> >      >      >      >
> >      >      >      > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
> >      >     <m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>
> >      >      >      > <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> >      >      >      >
> >      >      >      >      > I have a hard time to imagine how can we map
> >     in a
> >      >     generic way
> >      >      >      >     RestrictionTrackers into the existing
> >      >     Bounded/UnboundedSource, so I would
> >      >      >      >     love to hear more about the details.
> >      >      >      >
> >      >      >      >     Isn't it the other way around? The SDF is a
> >      >     generalization of
> >      >      >     UnboundedSource.
> >      >      >      >     So we would wrap UnboundedSource using SDF. I'm
> not
> >      >     saying it is
> >      >      >     trivial, but
> >      >      >      >     SDF offers all the functionality that
> >     UnboundedSource
> >      >     needs.
> >      >      >      >
> >      >      >      >     For example, the @GetInitialRestriction method
> >     would
> >      >     call split on the
> >      >      >      >     UnboundedSource and the restriction trackers
> would
> >      >     then be used to
> >      >      >     process the
> >      >      >      >     splits.
> >      >      >      >
> >      >      >      >     On 31.01.19 15:16, Ismaël Mejía wrote:
> >      >      >      >      >> Not necessarily. This would be one way.
> Another
> >      >     way is build an SDF
> >      >      >      >     wrapper for UnboundedSource. Probably the
> >     easier path
> >      >     for migration.
> >      >      >      >      >
> >      >      >      >      > That would be fantastic, I have heard about
> such
> >      >     wrapper multiple
> >      >      >      >      > times but so far there is not any realistic
> >      >     proposal. I have a hard
> >      >      >      >      > time to imagine how can we map in a generic
> way
> >      >     RestrictionTrackers
> >      >      >      >      > into the existing Bounded/UnboundedSource,
> so I
> >      >     would love to hear
> >      >      >      >      > more about the details.
> >      >      >      >      >
> >      >      >      >      > On Thu, Jan 31, 2019 at 3:07 PM Maximilian
> >     Michels
> >      >     <m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>
> >      >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> >      >      >      >      >>
> >      >      >      >      >>   > In addition to have support in the
> runners,
> >      >     this will require a
> >      >      >      >      >>   > rewrite of PubsubIO to use the new SDF
> API.
> >      >      >      >      >>
> >      >      >      >      >> Not necessarily. This would be one way.
> Another
> >      >     way is build an SDF
> >      >      >      >     wrapper for
> >      >      >      >      >> UnboundedSource. Probably the easier path
> for
> >      >     migration.
> >      >      >      >      >>
> >      >      >      >      >> On 31.01.19 14:03, Ismaël Mejía wrote:
> >      >      >      >      >>>> Fortunately, there is already a pending
> >     PR for
> >      >     cross-language
> >      >      >      >     pipelines which
> >      >      >      >      >>>> will allow us to use Java IO like PubSub
> in
> >      >     Python jobs.
> >      >      >      >      >>>
> >      >      >      >      >>> In addition to have support in the
> >     runners, this
> >      >     will require a
> >      >      >      >      >>> rewrite of PubsubIO to use the new SDF API.
> >      >      >      >      >>>
> >      >      >      >      >>> On Thu, Jan 31, 2019 at 12:23 PM
> >     Maximilian Michels
> >      >      >     <m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>
> >      >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> >      >      >      >      >>>>
> >      >      >      >      >>>> Hi Matthias,
> >      >      >      >      >>>>
> >      >      >      >      >>>> This is already reflected in the
> >     compatibility
> >      >     matrix, if you look
> >      >      >      >     under SDF.
> >      >      >      >      >>>> There is no UnboundedSource interface for
> >      >     portable pipelines.
> >      >      >     That's a
> >      >      >      >     legacy
> >      >      >      >      >>>> abstraction that will be replaced with
> SDF.
> >      >      >      >      >>>>
> >      >      >      >      >>>> Fortunately, there is already a pending
> >     PR for
> >      >     cross-language
> >      >      >      >     pipelines which
> >      >      >      >      >>>> will allow us to use Java IO like PubSub
> in
> >      >     Python jobs.
> >      >      >      >      >>>>
> >      >      >      >      >>>> Thanks,
> >      >      >      >      >>>> Max
> >      >      >      >      >>>>
> >      >      >      >      >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> >      >      >      >      >>>>> Hey Ankur,
> >      >      >      >      >>>>>
> >      >      >      >      >>>>> Thanks for the swift reply. Should I
> change
> >      >     this in the
> >      >      >     capability matrix
> >      >      >      >      >>>>>
> >      >     <https://s.apache.org/apache-beam-portability-support-table>
> >     then?
> >      >      >      >      >>>>>
> >      >      >      >      >>>>> Many thanks.
> >      >      >      >      >>>>> Best,
> >      >      >      >      >>>>> Matthias
> >      >      >      >      >>>>>
> >      >      >      >      >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur
> Goenka
> >      >     <goe...@google.com <mailto:goe...@google.com>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>>
> >      >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>>>
> >      >      >      >     <mailto:goe...@google.com
> >     <mailto:goe...@google.com> <mailto:goe...@google.com
> >     <mailto:goe...@google.com>>
> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>>>>
> >      >      >      >      >>>>> <mailto:goe...@google.com
> >     <mailto:goe...@google.com>
> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>
> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>>>
> >      >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>>
> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
> >     <mailto:goe...@google.com <mailto:goe...@google.com>>>>>> wrote:
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>       Hi Matthias,
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>       Unfortunately, unbounded reads
> >     including
> >      >     pubsub are not yet
> >      >      >      >     supported for
> >      >      >      >      >>>>>       portable runners.
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>       Thanks,
> >      >      >      >      >>>>>       Ankur
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>       On Thu, Jan 31, 2019 at 2:44 PM
> >     Matthias
> >      >     Baetens
> >      >      >      >     <baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>
> >     <mailto:baetensmatth...@gmail.com <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>>
> >      >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>
> >     <mailto:baetensmatth...@gmail.com <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>>>
> >      >      >      >      >>>>>       <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>
> >      >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>>
> >      >      >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>
> >      >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>
> >      >     <mailto:baetensmatth...@gmail.com
> >     <mailto:baetensmatth...@gmail.com>>>>>> wrote:
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           Hi everyone,
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           Last few days I have been
> >     trying to
> >      >     run a streaming
> >      >      >      >     pipeline (code on
> >      >      >      >      >>>>>           Github
> >      >     <https://github.com/matthiasa4/beam-demo>) on a
> >      >      >      >     Flink Runner.
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           I am running a Flink cluster
> >     locally
> >      >     (v1.5.6
> >      >      >      >      >>>>>
> >      >       <https://flink.apache.org/downloads.html>)
> >      >      >      >      >>>>>           I have built the SDK Harness
> >      >     Container: /./gradlew
> >      >      >      >      >>>>>
> >       :beam-sdks-python-container:docker/
> >      >      >      >      >>>>>           and started the JobServer:
> >     /./gradlew
> >      >      >      >      >>>>>
> >      >       :beam-runners-flink_2.11-job-server:runShadow
> >      >      >      >      >>>>>
>  -PflinkMasterUrl=localhost:8081./
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           I run my pipeline with:
> >     /env/bin/python
> >      >      >     streaming_pipeline.py
> >      >      >      >      >>>>>           --runner=PortableRunner
> >      >     --job_endpoint=localhost:8099
> >      >      >      >     --output xxx
> >      >      >      >      >>>>>           --input_subscription xxx
> >      >     --output_subscription xxx/
> >      >      >      >      >>>>>           /
> >      >      >      >      >>>>>           /
> >      >      >      >      >>>>>           All this is running inside a
> >     Ubuntu
> >      >     (Bionic) in a
> >      >      >     Virtualbox.
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           The job submits fine, but
> >      >     unfortunately fails after
> >      >      >     a few
> >      >      >      >     seconds with
> >      >      >      >      >>>>>           the error attached.
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           Anything I am missing or doing
> >     wrong?
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>           Many thanks.
> >      >      >      >      >>>>>           Best,
> >      >      >      >      >>>>>           Matthias
> >      >      >      >      >>>>>
> >      >      >      >      >>>>>
> >      >      >      >
> >      >      >
> >      >
> >
>

Reply via email to