Thanks for the explanation Robert it makes much more sense now. (Sorry
for the confusion in the mapping I mistyped the direction SDF <->
Source).

Status of SDF:
- Support for Dynamic Work Rebalancing is WIP.
- Bounded version translation is supported by all non-portable runners
in a relatively naive way.
- Unbounded version translation is not supported in the non-portable
runners. (Let's not forget that this case may make sense too).
- Portable runners translation of SDF is WIP
- There is only one IO that is written based on SDF:
  - HBaseIO
- Some other IOs should work out of the box (those based on
non-splittable DoFn):
  - ClickhouseIO
  - File-based ones: TextIO, AvroIO, ParquetIO
  - JdbcIO
  - SolrIO

Max thanks for your summary. I would like to add that we agree that
the runner specific translation via URN is a temporal solution until
the wrappers transforms are written, is this correct? In any case this
alternative standard expansion approach deserves a discussion of their
own as you mention.

On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw <rober...@google.com> wrote:
>
> Are you suggesting something akin to a generic
>
>     urn: JsonConfiguredJavaSource
>     payload: some json specifying which source and which parameters
>
> which would expand to actually constructing and applying that source?
>
> (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi 
> protos that fully specified it, and we use that same format to translate back 
> out.)
>
> On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <m...@apache.org> wrote:
>>
>> Recaping here:
>>
>> We all agree that SDF is the way to go for future implementations of
>> sources. It enables us to get rid of the source interfaces. However, SDF
>> does not solve the lack of streaming sources in Python.
>>
>> The expansion PR (thanks btw!) solves the problem of
>> expanding/translating URNs known to an ExpansionService. That is a more
>> programmatic way of replacing language-specific performs, instead of
>> relying on translators directly in the Runner.
>>
>> What is unsolved is the configuration of sources from a foreign
>> environment. In my opinion this is the most pressing issue for Python
>> sources, because what is PubSubIO worth in Python if you cannot
>> configure it?
>>
>> What about this:
>>
>> I think it is worth adding a JSON configuration option for all existing
>> Java sources. That way, we could easily configure them as part of the
>> expansion request (which would contain a JSON configuration). I'll
>> probably fork a thread to discuss this in more detail, but would like to
>> hear your thoughts.
>>
>> -Max
>>
>> On 01.02.19 13:08, Robert Bradshaw wrote:
>> > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org
>> > <mailto:m...@apache.org>> wrote:
>> >
>> >     Ah, I thought you meant native Flink transforms.
>> >
>> >     Exactly! The translation code is already there. The main challenge
>> >     is how to
>> >     programmatically configure the BeamIO from Python. I suppose that is
>> >     also an
>> >     unsolved problem for cross-language transforms in general.
>> >
>> >
>> > This is what https://github.com/apache/beam/pull/7316 does.
>> >
>> > For a particular source, one would want to define a URN and
>> > corresponding payload, then (probably) a CompositeTransform in Python
>> > that takes the users arguments, packages them into the payload, applies
>> > the ExternalTransform, and returns the results. How to handle arbitrary
>> > UDFs embedded in sources is still TBD.
>> >
>> >     For Matthias' pipeline with PubSubIO we can build something
>> >     specific, but for
>> >     the general case there should be way to initialize a Beam IO via a
>> >     configuration
>> >     map provided by an external environment.
>> >
>> >
>> > I thought quite a bit about how we could represent expansions statically
>> > (e.g. have some kind of expansion template that could be used, at least
>> > in many cases, as data without firing up a separate process. May be
>> > worth doing eventually, but we run into the same issues that were
>> > discussed at
>> > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
>> >
>> > If one is already using a portable runner like Flink, having the job
>> > service process automatically also serve up an expansion service for
>> > various URNs it knows and cares about is probably a pretty low bar.
>> > Flink could serve up things it would rather get back untouched in a
>> > transform with a special flink runner urn.
>> >
>> > As Ahmet mentions, SDF is better solution. I hope it's not that far
>> > away, but even once it comes we'll likely want the above framework to
>> > invoke the full suite of Java IOs even after they're running on SDF
>> > themselves.
>> >
>> > - Robert
>> >
>> >     On 31.01.19 17:36, Thomas Weise wrote:
>> >      > Exactly, that's what I had in mind.
>> >      >
>> >      > A Flink runner native transform would make the existing unbounded
>> >     sources
>> >      > available, similar to:
>> >      >
>> >      >
>> >     
>> > https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
>> >      >
>> >      >
>> >      >
>> >      >
>> >      > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
>> >     <m...@apache.org <mailto:m...@apache.org>
>> >      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>> >      >
>> >      >     Wouldn't it be even more useful for the transition period if
>> >     we enabled Beam IO
>> >      >     to be used via Flink (like in the legacy Flink Runner)? In
>> >     this particular
>> >      >     example, Matthias wants to use PubSubIO, which is not even
>> >     available as a
>> >      >     native
>> >      >     Flink transform.
>> >      >
>> >      >     On 31.01.19 16:21, Thomas Weise wrote:
>> >      >      > Until SDF is supported, we could also add Flink runner
>> >     native transforms for
>> >      >      > selected unbounded sources [1].
>> >      >      >
>> >      >      > That might be a reasonable option to unblock users that
>> >     want to try Python
>> >      >      > streaming on Flink.
>> >      >      >
>> >      >      > Thomas
>> >      >      >
>> >      >      > [1]
>> >      >      >
>> >      >
>> >     
>> > https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
>> >      >      >
>> >      >      >
>> >      >      > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
>> >     <m...@apache.org <mailto:m...@apache.org>
>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >      >      > <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >      >      >
>> >      >      >      > I have a hard time to imagine how can we map in a
>> >     generic way
>> >      >      >     RestrictionTrackers into the existing
>> >     Bounded/UnboundedSource, so I would
>> >      >      >     love to hear more about the details.
>> >      >      >
>> >      >      >     Isn't it the other way around? The SDF is a
>> >     generalization of
>> >      >     UnboundedSource.
>> >      >      >     So we would wrap UnboundedSource using SDF. I'm not
>> >     saying it is
>> >      >     trivial, but
>> >      >      >     SDF offers all the functionality that UnboundedSource
>> >     needs.
>> >      >      >
>> >      >      >     For example, the @GetInitialRestriction method would
>> >     call split on the
>> >      >      >     UnboundedSource and the restriction trackers would
>> >     then be used to
>> >      >     process the
>> >      >      >     splits.
>> >      >      >
>> >      >      >     On 31.01.19 15:16, Ismaël Mejía wrote:
>> >      >      >      >> Not necessarily. This would be one way. Another
>> >     way is build an SDF
>> >      >      >     wrapper for UnboundedSource. Probably the easier path
>> >     for migration.
>> >      >      >      >
>> >      >      >      > That would be fantastic, I have heard about such
>> >     wrapper multiple
>> >      >      >      > times but so far there is not any realistic
>> >     proposal. I have a hard
>> >      >      >      > time to imagine how can we map in a generic way
>> >     RestrictionTrackers
>> >      >      >      > into the existing Bounded/UnboundedSource, so I
>> >     would love to hear
>> >      >      >      > more about the details.
>> >      >      >      >
>> >      >      >      > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels
>> >     <m...@apache.org <mailto:m...@apache.org>
>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >      >      >      >>
>> >      >      >      >>   > In addition to have support in the runners,
>> >     this will require a
>> >      >      >      >>   > rewrite of PubsubIO to use the new SDF API.
>> >      >      >      >>
>> >      >      >      >> Not necessarily. This would be one way. Another
>> >     way is build an SDF
>> >      >      >     wrapper for
>> >      >      >      >> UnboundedSource. Probably the easier path for
>> >     migration.
>> >      >      >      >>
>> >      >      >      >> On 31.01.19 14:03, Ismaël Mejía wrote:
>> >      >      >      >>>> Fortunately, there is already a pending PR for
>> >     cross-language
>> >      >      >     pipelines which
>> >      >      >      >>>> will allow us to use Java IO like PubSub in
>> >     Python jobs.
>> >      >      >      >>>
>> >      >      >      >>> In addition to have support in the runners, this
>> >     will require a
>> >      >      >      >>> rewrite of PubsubIO to use the new SDF API.
>> >      >      >      >>>
>> >      >      >      >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels
>> >      >     <m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >      >      >      >>>>
>> >      >      >      >>>> Hi Matthias,
>> >      >      >      >>>>
>> >      >      >      >>>> This is already reflected in the compatibility
>> >     matrix, if you look
>> >      >      >     under SDF.
>> >      >      >      >>>> There is no UnboundedSource interface for
>> >     portable pipelines.
>> >      >     That's a
>> >      >      >     legacy
>> >      >      >      >>>> abstraction that will be replaced with SDF.
>> >      >      >      >>>>
>> >      >      >      >>>> Fortunately, there is already a pending PR for
>> >     cross-language
>> >      >      >     pipelines which
>> >      >      >      >>>> will allow us to use Java IO like PubSub in
>> >     Python jobs.
>> >      >      >      >>>>
>> >      >      >      >>>> Thanks,
>> >      >      >      >>>> Max
>> >      >      >      >>>>
>> >      >      >      >>>> On 31.01.19 12:06, Matthias Baetens wrote:
>> >      >      >      >>>>> Hey Ankur,
>> >      >      >      >>>>>
>> >      >      >      >>>>> Thanks for the swift reply. Should I change
>> >     this in the
>> >      >     capability matrix
>> >      >      >      >>>>>
>> >     <https://s.apache.org/apache-beam-portability-support-table> then?
>> >      >      >      >>>>>
>> >      >      >      >>>>> Many thanks.
>> >      >      >      >>>>> Best,
>> >      >      >      >>>>> Matthias
>> >      >      >      >>>>>
>> >      >      >      >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka
>> >     <goe...@google.com <mailto:goe...@google.com>
>> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>>
>> >      >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
>> >     <mailto:goe...@google.com <mailto:goe...@google.com>>>
>> >      >      >      >>>>> <mailto:goe...@google.com
>> >     <mailto:goe...@google.com> <mailto:goe...@google.com
>> >     <mailto:goe...@google.com>>
>> >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
>> >     <mailto:goe...@google.com <mailto:goe...@google.com>>>>> wrote:
>> >      >      >      >>>>>
>> >      >      >      >>>>>       Hi Matthias,
>> >      >      >      >>>>>
>> >      >      >      >>>>>       Unfortunately, unbounded reads including
>> >     pubsub are not yet
>> >      >      >     supported for
>> >      >      >      >>>>>       portable runners.
>> >      >      >      >>>>>
>> >      >      >      >>>>>       Thanks,
>> >      >      >      >>>>>       Ankur
>> >      >      >      >>>>>
>> >      >      >      >>>>>       On Thu, Jan 31, 2019 at 2:44 PM Matthias
>> >     Baetens
>> >      >      >     <baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com>>
>> >      >     <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com>>>
>> >      >      >      >>>>>       <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com>
>> >      >     <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com>>
>> >      >      >     <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com>
>> >      >     <mailto:baetensmatth...@gmail.com
>> >     <mailto:baetensmatth...@gmail.com>>>>> wrote:
>> >      >      >      >>>>>
>> >      >      >      >>>>>           Hi everyone,
>> >      >      >      >>>>>
>> >      >      >      >>>>>           Last few days I have been trying to
>> >     run a streaming
>> >      >      >     pipeline (code on
>> >      >      >      >>>>>           Github
>> >     <https://github.com/matthiasa4/beam-demo>) on a
>> >      >      >     Flink Runner.
>> >      >      >      >>>>>
>> >      >      >      >>>>>           I am running a Flink cluster locally
>> >     (v1.5.6
>> >      >      >      >>>>>
>> >       <https://flink.apache.org/downloads.html>)
>> >      >      >      >>>>>           I have built the SDK Harness
>> >     Container: /./gradlew
>> >      >      >      >>>>>           :beam-sdks-python-container:docker/
>> >      >      >      >>>>>           and started the JobServer: /./gradlew
>> >      >      >      >>>>>
>> >       :beam-runners-flink_2.11-job-server:runShadow
>> >      >      >      >>>>>           -PflinkMasterUrl=localhost:8081./
>> >      >      >      >>>>>
>> >      >      >      >>>>>           I run my pipeline with: /env/bin/python
>> >      >     streaming_pipeline.py
>> >      >      >      >>>>>           --runner=PortableRunner
>> >     --job_endpoint=localhost:8099
>> >      >      >     --output xxx
>> >      >      >      >>>>>           --input_subscription xxx
>> >     --output_subscription xxx/
>> >      >      >      >>>>>           /
>> >      >      >      >>>>>           /
>> >      >      >      >>>>>           All this is running inside a Ubuntu
>> >     (Bionic) in a
>> >      >     Virtualbox.
>> >      >      >      >>>>>
>> >      >      >      >>>>>           The job submits fine, but
>> >     unfortunately fails after
>> >      >     a few
>> >      >      >     seconds with
>> >      >      >      >>>>>           the error attached.
>> >      >      >      >>>>>
>> >      >      >      >>>>>           Anything I am missing or doing wrong?
>> >      >      >      >>>>>
>> >      >      >      >>>>>           Many thanks.
>> >      >      >      >>>>>           Best,
>> >      >      >      >>>>>           Matthias
>> >      >      >      >>>>>
>> >      >      >      >>>>>
>> >      >      >
>> >      >
>> >

Reply via email to