On Fri, Feb 1, 2019 at 6:12 AM Maximilian Michels <[email protected]> wrote:
> Yes, I imagine sources to implement a JsonConfigurable interface (e.g.
> on their builders):
>
> JsonConfigurable {
> // Either a json string or Map<String, String>
> apply(String jsonConfig);
> }
>
> In Python we would create this transform:
>
> URN: JsonConfiguredSource:v1
> payload: {
> environment: environment_id, // Java/Python/Go
> resourceIdentifier: string, // "org.apache.beam.io.PubSubIO"
> configuration: json config, // { "topic" : "my_pubsub_topic" }
> }
>
> That's more generic and could be used for other languages where we might
> have sources/sinks.
>
Looks good!
I imagine there would be a wrapper for this similar to:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py#L33
> > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
> protos that fully specified it, and we use that same format to translate
> back out.)
>
> Not that I know of.
>
> On 01.02.19 14:02, Robert Bradshaw wrote:
> > Are you suggesting something akin to a generic
> >
> > urn: JsonConfiguredJavaSource
> > payload: some json specifying which source and which parameters
> >
> > which would expand to actually constructing and applying that source?
> >
> > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
> > protos that fully specified it, and we use that same format to translate
> > back out.)
> >
> > On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Recaping here:
> >
> > We all agree that SDF is the way to go for future implementations of
> > sources. It enables us to get rid of the source interfaces. However,
> > SDF
> > does not solve the lack of streaming sources in Python.
> >
> > The expansion PR (thanks btw!) solves the problem of
> > expanding/translating URNs known to an ExpansionService. That is a
> more
> > programmatic way of replacing language-specific performs, instead of
> > relying on translators directly in the Runner.
> >
> > What is unsolved is the configuration of sources from a foreign
> > environment. In my opinion this is the most pressing issue for Python
> > sources, because what is PubSubIO worth in Python if you cannot
> > configure it?
> >
> > What about this:
> >
> > I think it is worth adding a JSON configuration option for all
> existing
> > Java sources. That way, we could easily configure them as part of the
> > expansion request (which would contain a JSON configuration). I'll
> > probably fork a thread to discuss this in more detail, but would
> > like to
> > hear your thoughts.
> >
> > -Max
> >
> > On 01.02.19 13:08, Robert Bradshaw wrote:
> > > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > Ah, I thought you meant native Flink transforms.
> > >
> > > Exactly! The translation code is already there. The main
> > challenge
> > > is how to
> > > programmatically configure the BeamIO from Python. I suppose
> > that is
> > > also an
> > > unsolved problem for cross-language transforms in general.
> > >
> > >
> > > This is what https://github.com/apache/beam/pull/7316 does.
> > >
> > > For a particular source, one would want to define a URN and
> > > corresponding payload, then (probably) a CompositeTransform in
> > Python
> > > that takes the users arguments, packages them into the payload,
> > applies
> > > the ExternalTransform, and returns the results. How to handle
> > arbitrary
> > > UDFs embedded in sources is still TBD.
> > >
> > > For Matthias' pipeline with PubSubIO we can build something
> > > specific, but for
> > > the general case there should be way to initialize a Beam IO
> > via a
> > > configuration
> > > map provided by an external environment.
> > >
> > >
> > > I thought quite a bit about how we could represent expansions
> > statically
> > > (e.g. have some kind of expansion template that could be used, at
> > least
> > > in many cases, as data without firing up a separate process. May
> be
> > > worth doing eventually, but we run into the same issues that were
> > > discussed at
> > > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
> > >
> > > If one is already using a portable runner like Flink, having the
> job
> > > service process automatically also serve up an expansion service
> for
> > > various URNs it knows and cares about is probably a pretty low
> bar.
> > > Flink could serve up things it would rather get back untouched in
> a
> > > transform with a special flink runner urn.
> > >
> > > As Ahmet mentions, SDF is better solution. I hope it's not that
> far
> > > away, but even once it comes we'll likely want the above
> > framework to
> > > invoke the full suite of Java IOs even after they're running on
> SDF
> > > themselves.
> > >
> > > - Robert
> > >
> > > On 31.01.19 17:36, Thomas Weise wrote:
> > > > Exactly, that's what I had in mind.
> > > >
> > > > A Flink runner native transform would make the existing
> > unbounded
> > > sources
> > > > available, similar to:
> > > >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > >
> > > > Wouldn't it be even more useful for the transition
> > period if
> > > we enabled Beam IO
> > > > to be used via Flink (like in the legacy Flink
> Runner)? In
> > > this particular
> > > > example, Matthias wants to use PubSubIO, which is not
> even
> > > available as a
> > > > native
> > > > Flink transform.
> > > >
> > > > On 31.01.19 16:21, Thomas Weise wrote:
> > > > > Until SDF is supported, we could also add Flink
> runner
> > > native transforms for
> > > > > selected unbounded sources [1].
> > > > >
> > > > > That might be a reasonable option to unblock users
> that
> > > want to try Python
> > > > > streaming on Flink.
> > > > >
> > > > > Thomas
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> > > > >
> > > > >
> > > > > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> > > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > > >
> > > > > > I have a hard time to imagine how can we map
> > in a
> > > generic way
> > > > > RestrictionTrackers into the existing
> > > Bounded/UnboundedSource, so I would
> > > > > love to hear more about the details.
> > > > >
> > > > > Isn't it the other way around? The SDF is a
> > > generalization of
> > > > UnboundedSource.
> > > > > So we would wrap UnboundedSource using SDF. I'm
> not
> > > saying it is
> > > > trivial, but
> > > > > SDF offers all the functionality that
> > UnboundedSource
> > > needs.
> > > > >
> > > > > For example, the @GetInitialRestriction method
> > would
> > > call split on the
> > > > > UnboundedSource and the restriction trackers
> would
> > > then be used to
> > > > process the
> > > > > splits.
> > > > >
> > > > > On 31.01.19 15:16, Ismaël Mejía wrote:
> > > > > >> Not necessarily. This would be one way.
> Another
> > > way is build an SDF
> > > > > wrapper for UnboundedSource. Probably the
> > easier path
> > > for migration.
> > > > > >
> > > > > > That would be fantastic, I have heard about
> such
> > > wrapper multiple
> > > > > > times but so far there is not any realistic
> > > proposal. I have a hard
> > > > > > time to imagine how can we map in a generic
> way
> > > RestrictionTrackers
> > > > > > into the existing Bounded/UnboundedSource,
> so I
> > > would love to hear
> > > > > > more about the details.
> > > > > >
> > > > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian
> > Michels
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> > > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > > > >>
> > > > > >> > In addition to have support in the
> runners,
> > > this will require a
> > > > > >> > rewrite of PubsubIO to use the new SDF
> API.
> > > > > >>
> > > > > >> Not necessarily. This would be one way.
> Another
> > > way is build an SDF
> > > > > wrapper for
> > > > > >> UnboundedSource. Probably the easier path
> for
> > > migration.
> > > > > >>
> > > > > >> On 31.01.19 14:03, Ismaël Mejía wrote:
> > > > > >>>> Fortunately, there is already a pending
> > PR for
> > > cross-language
> > > > > pipelines which
> > > > > >>>> will allow us to use Java IO like PubSub
> in
> > > Python jobs.
> > > > > >>>
> > > > > >>> In addition to have support in the
> > runners, this
> > > will require a
> > > > > >>> rewrite of PubsubIO to use the new SDF API.
> > > > > >>>
> > > > > >>> On Thu, Jan 31, 2019 at 12:23 PM
> > Maximilian Michels
> > > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> > > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > > > >>>>
> > > > > >>>> Hi Matthias,
> > > > > >>>>
> > > > > >>>> This is already reflected in the
> > compatibility
> > > matrix, if you look
> > > > > under SDF.
> > > > > >>>> There is no UnboundedSource interface for
> > > portable pipelines.
> > > > That's a
> > > > > legacy
> > > > > >>>> abstraction that will be replaced with
> SDF.
> > > > > >>>>
> > > > > >>>> Fortunately, there is already a pending
> > PR for
> > > cross-language
> > > > > pipelines which
> > > > > >>>> will allow us to use Java IO like PubSub
> in
> > > Python jobs.
> > > > > >>>>
> > > > > >>>> Thanks,
> > > > > >>>> Max
> > > > > >>>>
> > > > > >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> > > > > >>>>> Hey Ankur,
> > > > > >>>>>
> > > > > >>>>> Thanks for the swift reply. Should I
> change
> > > this in the
> > > > capability matrix
> > > > > >>>>>
> > > <https://s.apache.org/apache-beam-portability-support-table>
> > then?
> > > > > >>>>>
> > > > > >>>>> Many thanks.
> > > > > >>>>> Best,
> > > > > >>>>> Matthias
> > > > > >>>>>
> > > > > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur
> Goenka
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> > > > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>
> > > > > >>>>> <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Matthias,
> > > > > >>>>>
> > > > > >>>>> Unfortunately, unbounded reads
> > including
> > > pubsub are not yet
> > > > > supported for
> > > > > >>>>> portable runners.
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> Ankur
> > > > > >>>>>
> > > > > >>>>> On Thu, Jan 31, 2019 at 2:44 PM
> > Matthias
> > > Baetens
> > > > > <[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>>>
> > > > > >>>>> <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>>
> > > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>>>>> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi everyone,
> > > > > >>>>>
> > > > > >>>>> Last few days I have been
> > trying to
> > > run a streaming
> > > > > pipeline (code on
> > > > > >>>>> Github
> > > <https://github.com/matthiasa4/beam-demo>) on a
> > > > > Flink Runner.
> > > > > >>>>>
> > > > > >>>>> I am running a Flink cluster
> > locally
> > > (v1.5.6
> > > > > >>>>>
> > > <https://flink.apache.org/downloads.html>)
> > > > > >>>>> I have built the SDK Harness
> > > Container: /./gradlew
> > > > > >>>>>
> > :beam-sdks-python-container:docker/
> > > > > >>>>> and started the JobServer:
> > /./gradlew
> > > > > >>>>>
> > > :beam-runners-flink_2.11-job-server:runShadow
> > > > > >>>>>
> -PflinkMasterUrl=localhost:8081./
> > > > > >>>>>
> > > > > >>>>> I run my pipeline with:
> > /env/bin/python
> > > > streaming_pipeline.py
> > > > > >>>>> --runner=PortableRunner
> > > --job_endpoint=localhost:8099
> > > > > --output xxx
> > > > > >>>>> --input_subscription xxx
> > > --output_subscription xxx/
> > > > > >>>>> /
> > > > > >>>>> /
> > > > > >>>>> All this is running inside a
> > Ubuntu
> > > (Bionic) in a
> > > > Virtualbox.
> > > > > >>>>>
> > > > > >>>>> The job submits fine, but
> > > unfortunately fails after
> > > > a few
> > > > > seconds with
> > > > > >>>>> the error attached.
> > > > > >>>>>
> > > > > >>>>> Anything I am missing or doing
> > wrong?
> > > > > >>>>>
> > > > > >>>>> Many thanks.
> > > > > >>>>> Best,
> > > > > >>>>> Matthias
> > > > > >>>>>
> > > > > >>>>>
> > > > >
> > > >
> > >
> >
>