Hi Aljoscha,
This is an excellent question! And the answer is, we don't need any new
concepts like "SDF executor" and can rely on the per-key state and timers
machinery that already exists in all runners because it's necessary to
implement windowing/triggering properly.

Note that this is already somewhat addressed in the previously posted State
and Timers proposal https://s.apache.org/beam-state , under "per-key
workflows".

Think of it this way, using the Kafka example: we'll expand it into a
transform:

(1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
partition in topic.listPartitions() }
(2) GroupByKey
(3) ParDo { key, topic, partition, R -> Kafka reader code in the
proposal/slides }
  - R is the OffsetRange restriction which in this case will be always of
the form [startOffset, inf).
  - there'll be just 1 value per key, but we use GBK to just get access to
the per-key state/timers machinery. This may be runner-specific; maybe some
runners don't need a GBK to do that.

Now suppose the topic has two partitions, P1 and P2, and they get assigned
unique keys K1, K2.
Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)),
(K2, topic, P2, [0, inf)).
Suppose we have just 1 worker with just 1 thread. Now, how will this thread
be able to produce elements from both P1 and P2? here's how.

The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
certain time or after a certain number of elements are output (just like
with the current UnboundedSource reading code) producing a residual
restriction R1' (basically a new start timestamp), put R11 into the per-key
state and set a timer T1 to resume.
Then it will process (K2, topic, P2, [0, inf)), do the same producing a
residual restriction R2' and setting a timer T2 to resume.
Then timer T1 will fire in the context of the key K1. The thread will call
processElement again, this time supplying R1' as the restriction; the
process repeats and after a while it checkpoints and stores R1'' into state
of K1.
Then timer T2 will fire in the context of K2, run processElement for a
while, set a new timer and store R2'' into the state of K2.
Etc.
If partition 1 goes away, the processElement call will return "do not
resume", so a timer will not be set and instead the state associated with
K1 will be GC'd.

So basically it's almost like cooperative thread scheduling: things run for
a while, until the runner tells them to checkpoint, then they set a timer
to resume themselves, and the runner fires the timers, and the process
repeats. And, again, this only requires things that runners can already do
- state and timers, but no new concept of SDF executor (and consequently no
necessity to choose/tune how many you need).

Makes sense?

On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I have another question that I think wasn't addressed in the meeting. At
> least it wasn't mentioned in the notes.
>
> In the context of replacing sources by a combination of to SDFs, how do you
> determine how many "SDF executor" instances you need downstream? For the
> sake of argument assume that both SDFs are executed with parallelism 1 (or
> one per worker). Now, if you have a file source that reads from a static
> set of files the first SDF would emit the filenames while the second SDF
> would receive the filenames and emit their contents. This works well and
> the downstream SDF can process one filename after the other. Now, think of
> something like a Kafka source. The first SDF would emit the partitions (say
> 4 partitions, in this example) and the second SDF would be responsible for
> reading from a topic and emitting elements. Reading from one topic never
> finishes so you can't process the topics in series. I think you would need
> to have 4 downstream "SDF executor" instances. The question now is: how do
> you determine whether you are in the first or the second situation?
>
> Probably I'm just overlooking something and this is already dealt with
> somewhere... :-)
>
> Cheers,
> Aljoscha
>
> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> wrote:
>
> > Hello,
> >
> > Thanks for the notes both Dan and Eugene, and for taking the time to do
> the
> > presentation and  answer our questions.
> >
> > I mentioned the ongoing work on dynamic scaling on Flink because I
> suppose
> > that it will address dynamic rebalancing eventually (there are multiple
> > changes going on for dynamic scaling).
> >
> >
> >
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
> >
> > https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8
> >
> > Anyway I am far from an expert on flink, but probably the flink guys can
> > give their opinion about this and refer to a more precise document that
> the
> > ones I mentioned..
> >
> > ​Thanks again,
> > Ismaël​
> >
> > On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> > > Great summary Eugene and Dan.
> > >
> > > And thanks again for the details, explanation, and discussion.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
> > >
> > >> Thanks for attending, everybody!
> > >>
> > >> Here are meeting notes (thanks Dan!).
> > >>
> > >> Q: Will SplittableDoFn enable better repartitioning of the
> input/output
> > >> data?
> > >> A: Not really; repartitioning is orthogonal to SDF.
> > >>
> > >> Current Source API suffers from lack of composition and scalability
> > >> because
> > >> we treat sources too much as metadata, not enough as data.
> > >>
> > >> Q(slide with transform expansion): who does the "magic"?
> > >> A: The runner. Checkpointing and dynamically splitting restrictions
> will
> > >> require collaboration with the runner.
> > >>
> > >> Q: How does the runner interact with the DoFn to control the
> > restrictions?
> > >> Is it related to the centralized job tracker etc.?
> > >> A: RestrictionTracker is a simple helper object, that exists purely on
> > the
> > >> worker while executing a single partition, and interacts with the
> worker
> > >> harness part of the runner. Not to be confused with the centralized
> job
> > >> tracker (master) - completely unrelated. Worker harness, of course,
> > >> interacts with the master in some relevant ways (e.g. Dataflow master
> > can
> > >> tell "you're a straggler, you should split").
> > >>
> > >> Q: Is this a new DoFn subclass, or how will this integrate with the
> > >> existing code?
> > >> A: It's a feature of reflection-based DoFn (
> > https://s.apache.org/a-new-do
> > >> fn)
> > >> - just another optional parameter of type RestrictionTracker to
> > >> processElement() which is dynamically bound via reflection, so fully
> > >> backward/forward compatible, and looks to users like a regular DoFn.
> > >>
> > >> Q: why is fractionClaimed a double?
> > >> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
> > >> rebalancing) requires a uniform way to represent progress through
> > >> different
> > >> sources.
> > >>
> > >> Q: Spark runner is microbatch-based, so this seems to map well onto
> > >> checkpoint/resume, right?
> > >> A: Yes; actually the Dataflow runner is, at a worker level, also
> > >> microbatch-based. The way SDF interacts with a runner will be very
> > similar
> > >> to how a Bounded/UnboundedSource interacts with a runner.
> > >>
> > >> Q: Using SDF, what would be the "packaging" of the IO?
> > >> A: Same as currently: package IO's as PTransforms and their
> > implementation
> > >> under the hood can be anything: Source, simple ParDo's, SDF, etc. E.g.
> > >> Datastore was recently refactored from BoundedSource to ParDo (ended
> up
> > >> simpler and more scalable), transparently to users.
> > >>
> > >> Q: What's the timeline; what to do with the IOs currently in
> > development?
> > >> A: Timeline is O(months). Keep doing what you're doing and working on
> > top
> > >> of Source APIs when necessary and simple ParDo's otherwise.
> > >>
> > >> Q: What's the impact for the runner writers?
> > >> A: Tentatively expected that most of the code for running an SDF will
> be
> > >> common to runners, with some amount of per-runner glue code, just like
> > >> GBK/windowing/triggering. Impact on Dataflow runner is larger since it
> > >> supports dynamic rebalancing in batch mode and this is the hardest
> part,
> > >> but for other runners shouldn't be too hard.
> > >>
> > >> JB: Talend has people who can help with this: e.g. help integrate into
> > >> Spark runner, refactor IOs etc. Amit also willing to chat about
> > supporting
> > >> SDF in Spark runner.
> > >>
> > >> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
> will
> > >> send a link.
> > >>
> > >>
> > >> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <j...@nanthrax.net
> >
> > >> wrote:
> > >>
> > >> Hi Eugene,
> > >>>
> > >>> thanks for the reminder.
> > >>>
> > >>> Just to prepare some topics for the call, please find some points:
> > >>>
> > >>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds to
> me
> > >>> that we can keep the IO packaging style (using with* setters for the
> IO
> > >>> configuration) and replace PTransform, Source, Reader, ... directly
> > with
> > >>> SDF. Correct ?
> > >>>
> > >>> 2. What's your plan in term of release to include SDF ? We have
> several
> > >>> IOs in preparation and I wonder if it's worth to start to use the new
> > >>> SDF API or not.
> > >>>
> > >>> 3. What's the impact for the runner writers ? The runners will have
> to
> > >>> support SDF, that could be tricky depending of the execution engine.
> In
> > >>> the worst case where the runner can't fully support SDF, does it mean
> > >>> that most of our IOs will be useless ?
> > >>>
> > >>> Just my dumb topics ;)
> > >>>
> > >>> Thanks,
> > >>> See you at 8am !
> > >>>
> > >>> Regards
> > >>> JB
> > >>>
> > >>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
> > >>>
> > >>>> Hello everybody,
> > >>>>
> > >>>> Just a reminder:
> > >>>>
> > >>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to
> > >>>> join
> > >>>> the call go to
> > >>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn .
> > >>>> I intend to go over the proposed design and then have a free-form
> > >>>> discussion.
> > >>>>
> > >>>> Please have a skim through the proposal doc: https://s.apache.org/
> > >>>> splittable-do-fn
> > >>>> I also made some slides that are basically a trimmed-down version of
> > the
> > >>>> doc to use as a guide when conducting the meeting,
> > >>>>
> > >>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
> > >>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
> > >>>
> > >>>> .
> > >>>>
> > >>>> I will post notes from the meeting on this thread afterwards.
> > >>>>
> > >>>> Thanks, looking forward.
> > >>>>
> > >>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
> > >>>> <dhalp...@google.com.invalid
> > >>>>
> > >>>> wrote:
> > >>>>
> > >>>> This is pretty cool! I'll be there too. (unless the hangout gets too
> > >>>>>
> > >>>> full
> > >>>
> > >>>> -- if so, I'll drop out in favor of others who aren't lucky enough
> to
> > >>>>>
> > >>>> get
> > >>>
> > >>>> to talk to Eugene all the time.)
> > >>>>>
> > >>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
> > >>>>>
> > >>>> psaltis.and...@gmail.com>
> > >>>
> > >>>> wrote:
> > >>>>>
> > >>>>> +1 I'll join
> > >>>>>>
> > >>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
> > >>>>>>
> > >>>>> apban...@cisco.com
> > >>>>>
> > >>>>>>
> > >>>>>>> wrote:
> > >>>>>>
> > >>>>>> + 1, me2
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com
> > >>>>>>>
> > >>>>>> <javascript:;>>
> > >>>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>> +1 as in I'll join ;-)
> > >>>>>>>>
> > >>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> > >>>>>>>>
> > >>>>>>> <kirpic...@google.com.invalid
> > >>>>>>
> > >>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Sounds good, thanks!
> > >>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
> > >>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
> > >>>>>>>>>
> > >>>>>>>> com/splittabledofn
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
> > >>>>>>>>>
> > >>>>>>>> j...@nanthrax.net
> > >>>>>>
> > >>>>>>> <javascript:;>>
> > >>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi
> > >>>>>>>>>>
> > >>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What about
> > >>>>>>>>>>
> > >>>>>>>>> Friday
> > >>>>>
> > >>>>>> 19th ?
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Regards
> > >>>>>>>>>> JB
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > >>>>>>>>>> <kirpic...@google.com.INVALID> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi JB,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Sounds great, does the suggested time over videoconference
> work
> > >>>>>>>>>>>
> > >>>>>>>>>> for
> > >>>>>
> > >>>>>> you?
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
> > >>>>>>>>>>>
> > >>>>>>>>>> j...@nanthrax.net <javascript:;>>
> > >>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi Eugene
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> May we talk together next week ? I like the proposal. I
> would
> > >>>>>>>>>>>>
> > >>>>>>>>>>> just
> > >>>>>>
> > >>>>>>> need
> > >>>>>>>>>>>
> > >>>>>>>>>>>> some details for my understanding.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks
> > >>>>>>>>>>>> Regards
> > >>>>>>>>>>>> JB
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > >>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi JB,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What are your thoughts on this?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain
> more
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> about
> > >>>>>>>
> > >>>>>>>> this
> > >>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> digest.
> > >>>>>
> > >>>>>>
> > >>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over
> > Hangouts?
> > >>>>>>>>>>>>> (link:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
> > >>>>>>>>>
> > >>>>>>>> com/splittabledofn
> > >>>>>>>
> > >>>>>>>> -
> > >>>>>>>>>>>>> I confirmed that it can be joined without being logged
> into a
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> Google
> > >>>>>>>
> > >>>>>>>> account)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Who'd be interested in attending, and does this time/date
> > work
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> for
> > >>>>>>
> > >>>>>>> people?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> <kirpic...@google.com <javascript:;>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> It sounds like you are concerned about continued support
> for
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> existing
> > >>>>>>>>>>>
> > >>>>>>>>>>>> IO's
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> people have developed, and about backward compatibility?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> We do not need to remove the Source API, and all existing
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> Source-based
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> connectors will continue to work [though the document
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> proposes
> > >>>>>
> > >>>>>> at
> > >>>>>>>
> > >>>>>>>> some
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> point to make Read.from(Source) to translate to a wrapper
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> SDF
> > >>>>>
> > >>>>>> under
> > >>>>>>>>>>>
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> hood, to exercise the feature more and to make sure that
> it
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> is
> > >>>>>
> > >>>>>> strictly
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> more powerful - but this is an optional implementation
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> detail].
> > >>>>>>
> > >>>>>>>
> > >>>>>>>>>>>>>> Perhaps the document phrases this too strongly -
> "replacing
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> the
> > >>>>>>
> > >>>>>>> Source
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API so
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> powerful
> > >>>>>>>>>>>
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> Source
> > >>>>>>
> > >>>>>>> API
> > >>>>>>>>>>>
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the time, even though they don't have to" :) And we can
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> discuss
> > >>>>>>
> > >>>>>>> whether or
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> point
> > >>>>>
> > >>>>>> down
> > >>>>>>>
> > >>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> road, once it becomes clear whether this is the case or
> not.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> To give more context: this proposal came out of
> discussions
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> within
> > >>>>>>>
> > >>>>>>>> the SDK
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> existed,
> > >>>>>>
> > >>>>>>> on
> > >>>>>>>
> > >>>>>>>> how to
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> make major improvements to the Source API; perhaps it will
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> clarify
> > >>>>>>>
> > >>>>>>>> things
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> if I give a history of the ideas discussed:
> > >>>>>>>>>>>>>> - The first idea was to introduce a
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> Read.from(PCollection<Source>)
> > >>>>>>>
> > >>>>>>>> transform while keeping the Source API intact - this, given
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> appropriate
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> implementation, would solve most of the scalability and
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> composability
> > >>>>>>>>>>>
> > >>>>>>>>>>>> issues of IO's. Then most connectors would look like :
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> ParDo<A,
> > >>>>>>
> > >>>>>>> Source<B>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> + Read.from().
> > >>>>>>>>>>>>>> - Then we figured that the Source class is an unnecessary
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> abstraction, as
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, B>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> class
> > >>>>>
> > >>>>>> where
> > >>>>>>>>>>>
> > >>>>>>>>>>>> S is
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the source type and B the output type? Then connectors
> would
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> be
> > >>>>>>
> > >>>>>>> something
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> > >>>>>>>>>>>>>> - Then somebody remarked that some of the features of
> Source
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> are
> > >>>>>>
> > >>>>>>> useful to
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> processing a
> > >>>>>>>
> > >>>>>>>> very
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> heavy element, or ability to produce very large output in
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> parallel.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> - The two previous bullets were already hinting that the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> Read.using()
> > >>>>>>>>>>>
> > >>>>>>>>>>>> primitive might not be so special: it just takes S and
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> produces
> > >>>>>>
> > >>>>>>> B:
> > >>>>>>>
> > >>>>>>>> isn't
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> convenience
> > >>>>>>>>>>>
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
> > >>>>>>>>>>>>>> - At this point it became clear that we should explore
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> unifying
> > >>>>>>
> > >>>>>>> sources
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> sources
> > >>>>>
> > >>>>>> to
> > >>>>>>
> > >>>>>>> ParDo's
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> but without the limitations and coding inconveniences? And
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> this
> > >>>>>>
> > >>>>>>> is
> > >>>>>>>
> > >>>>>>>> how
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a DoFn
> by
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> providing
> > >>>>>>>>>>>
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> RangeTracker.
> > >>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it became
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> clear
> > >>>>>>
> > >>>>>>> that
> > >>>>>>>>>>>
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> is strictly more general than sources; at least, in the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> respect
> > >>>>>>
> > >>>>>>> that
> > >>>>>>>>>>>
> > >>>>>>>>>>>> sources have to produce output, while DoFn's don't: an SDF
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> may
> > >>>>>
> > >>>>>> very
> > >>>>>>>>>>>
> > >>>>>>>>>>>> well
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> produce no output at all, and simply perform a side effect
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> in
> > >>>>>
> > >>>>>> a
> > >>>>>>
> > >>>>>>> parallel/resumable way.
> > >>>>>>>>>>>>>> - Then there were countless hours of discussions on
> unifying
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> the
> > >>>>>>
> > >>>>>>> bounded/unbounded cases, on the particulars of RangeTracker
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> APIs
> > >>>>>>
> > >>>>>>> reconciling parallelization and checkpointing, what the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> relation
> > >>>>>>
> > >>>>>>> between
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the current
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> proposal.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> The
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> proposal comes at a time when a couple of key ingredients
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> are
> > >>>>>
> > >>>>>> (almost)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, and
> the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> State/Timers
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> proposal to enable unbounded work per element.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> To put it shortly:
> > >>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and
> will
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> support
> > >>>>>>>>>>>
> > >>>>>>>>>>>> writing new ones, possibly forever. There is no interference
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> with
> > >>>>>>>
> > >>>>>>>> current
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> users of Source.
> > >>>>>>>>>>>>>> - The new API is an attempt to improve the Source API,
> taken
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> to
> > >>>>>>
> > >>>>>>> its
> > >>>>>>>>>>>
> > >>>>>>>>>>>> logical limit where it turns out that users' goals can be
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> accomplished
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> easier and more generically entirely within ParDo's.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Let me know what you think, and thanks again!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> <j...@nanthrax.net <javascript:;>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Eugene,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an
> improvement
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> of
> > >>>>>
> > >>>>>> Source
> > >>>>>>>>>>>
> > >>>>>>>>>>>> ?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> If I understand correctly, it means that we will have to
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> refactore
> > >>>>>>>>>>>
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> existing IO: basically, what you propose is to remove all
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Source
> > >>>>>>>
> > >>>>>>>> to
> > >>>>>>>>>>>
> > >>>>>>>>>>>> replace with NewDoFn.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I'm concern with this approach, especially in term of
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> timing:
> > >>>>>
> > >>>>>> clearly,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the IO is the area where we have to move forward in Beam
> as
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> it
> > >>>>>>
> > >>>>>>> will
> > >>>>>>>>>>>
> > >>>>>>>>>>>> allow new users to start in their projects.
> > >>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, Cassandra,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> MongoDB,
> > >>>>>>>
> > >>>>>>>> JDBC,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ... and some people started to learn the IO API
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> (Bounded/Unbouded
> > >>>>>>>
> > >>>>>>>> source, etc).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> (Source)
> > >>>>>>
> > >>>>>>> instead
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> of introducing a NewDoFn.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Regards
> > >>>>>>>>>>>>>>> JB
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hello Beam community,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would like
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> to
> > >>>>>
> > >>>>>> propose
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, which
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> allows
> > >>>>>>>>>>>
> > >>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> checkpointable
> > >>>>>>
> > >>>>>>> and
> > >>>>>>>>>>>
> > >>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> work
> > >>>>>>
> > >>>>>>> per
> > >>>>>>>>>>>
> > >>>>>>>>>>>> element.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> This allows effectively replacing the current
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Bounded/UnboundedSource
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> APIs
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more scalable
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> and
> > >>>>>
> > >>>>>> composable
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables many
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> use
> > >>>>>>
> > >>>>>>> cases
> > >>>>>>>>>>>
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> were previously difficult or impossible, as well as some
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> non-obvious new
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> use cases.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA
> [BEAM-65]
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> and
> > >>>>>>
> > >>>>>>> some
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Beam
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> meetings, and now the whole thing is written up in a
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> document:
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>>>>>>>>>>         https://s.apache.org/splittable-do-fn
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Here are some things that become possible with
> Splittable
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> DoFn:
> > >>>>>>>
> > >>>>>>>> - Efficiently read a filepattern matching millions of
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> files
> > >>>>>
> > >>>>>> - Read a collection of files that are produced by an
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> earlier
> > >>>>>>
> > >>>>>>> step
> > >>>>>>>>>>>
> > >>>>>>>>>>>> in the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a storage
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> system
> > >>>>>>>
> > >>>>>>>> that can
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> export itself to files)
> > >>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> partitions"
> > >>>>>
> > >>>>>> DoFn
> > >>>>>>>>>>>
> > >>>>>>>>>>>> with a
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new records
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> in
> > >>>>>>
> > >>>>>>> a
> > >>>>>>>
> > >>>>>>>> while()
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> loop
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> incrementally
> > >>>>>>>
> > >>>>>>>> returns
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file
> > >>>>>>>>>>>>>>>> - Implement a parallel "count friends in common"
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> algorithm
> > >>>>>
> > >>>>>> (matrix
> > >>>>>>>>>>>
> > >>>>>>>>>>>> squaring) with good work balancing
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> reader
> > >>>>>
> > >>>>>> written
> > >>>>>>>>>>>
> > >>>>>>>>>>>> against
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> this API:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>     ProcessContinuation processElement(
> > >>>>>>>>>>>>>>>>             ProcessContext context, OffsetRangeTracker
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> tracker)
> > >>>>>>>
> > >>>>>>>> {
> > >>>>>>>>>>>
> > >>>>>>>>>>>>       try (KafkaConsumer<String, String> consumer =
> > >>>>>>>>>>>>>>>>                 Kafka.subscribe(context.element().topic,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>  context.element().partition)) {
> > >>>>>>>
> > >>>>>>>>         consumer.seek(tracker.start());
> > >>>>>>>>>>>>>>>>         while (true) {
> > >>>>>>>>>>>>>>>>           ConsumerRecords<String, String> records =
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> consumer.poll(100ms);
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>           if (records == null) return done();
> > >>>>>>>>>>>>>>>>           for (ConsumerRecord<String, String> record :
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> records)
> > >>>>>>>
> > >>>>>>>> {
> > >>>>>>>>>>>
> > >>>>>>>>>>>>             if (!tracker.tryClaim(record.offset())) {
> > >>>>>>>>>>>>>>>>               return
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> resume().withFutureOutputWatermark(record.timestamp());
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>             }
> > >>>>>>>>>>>>>>>>             context.output(record);
> > >>>>>>>>>>>>>>>>           }
> > >>>>>>>>>>>>>>>>         }
> > >>>>>>>>>>>>>>>>       }
> > >>>>>>>>>>>>>>>>     }
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> The document describes in detail the motivations behind
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> this
> > >>>>>>
> > >>>>>>> feature,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> incremental
> > >>>>>>>
> > >>>>>>>> delivery
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> plan.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new DoFn
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [new-do-fn]
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> and is
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [beam-state].
> > >>>>>>
> > >>>>>>>
> > >>>>>>>>>>>>>>>> Please take a look and comment!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > >>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn
> > >>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>> Jean-Baptiste Onofré
> > >>>>>>>>>>>>>>> jbono...@apache.org <javascript:;>
> > >>>>>>>>>>>>>>> http://blog.nanthrax.net
> > >>>>>>>>>>>>>>> Talend - http://www.talend.com
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Thanks,
> > >>>>>> Andrew
> > >>>>>>
> > >>>>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> > >>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> > >>>>>> twiiter: @itmdata <
> > http://twitter.com/intent/user?screen_name=itmdata
> > >>>>>> >
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>> --
> > >>> Jean-Baptiste Onofré
> > >>> jbono...@apache.org
> > >>> http://blog.nanthrax.net
> > >>> Talend - http://www.talend.com
> > >>>
> > >>>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>

Reply via email to