Anyway, from a runner perspective, we will have kind of API (part of the Runner API) to "orchestrate" the SDF as we discussed during the call, right ?

Regards
JB

On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
Hi Aljoscha,
This is an excellent question! And the answer is, we don't need any new
concepts like "SDF executor" and can rely on the per-key state and timers
machinery that already exists in all runners because it's necessary to
implement windowing/triggering properly.

Note that this is already somewhat addressed in the previously posted State
and Timers proposal https://s.apache.org/beam-state , under "per-key
workflows".

Think of it this way, using the Kafka example: we'll expand it into a
transform:

(1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
partition in topic.listPartitions() }
(2) GroupByKey
(3) ParDo { key, topic, partition, R -> Kafka reader code in the
proposal/slides }
  - R is the OffsetRange restriction which in this case will be always of
the form [startOffset, inf).
  - there'll be just 1 value per key, but we use GBK to just get access to
the per-key state/timers machinery. This may be runner-specific; maybe some
runners don't need a GBK to do that.

Now suppose the topic has two partitions, P1 and P2, and they get assigned
unique keys K1, K2.
Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)),
(K2, topic, P2, [0, inf)).
Suppose we have just 1 worker with just 1 thread. Now, how will this thread
be able to produce elements from both P1 and P2? here's how.

The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
certain time or after a certain number of elements are output (just like
with the current UnboundedSource reading code) producing a residual
restriction R1' (basically a new start timestamp), put R11 into the per-key
state and set a timer T1 to resume.
Then it will process (K2, topic, P2, [0, inf)), do the same producing a
residual restriction R2' and setting a timer T2 to resume.
Then timer T1 will fire in the context of the key K1. The thread will call
processElement again, this time supplying R1' as the restriction; the
process repeats and after a while it checkpoints and stores R1'' into state
of K1.
Then timer T2 will fire in the context of K2, run processElement for a
while, set a new timer and store R2'' into the state of K2.
Etc.
If partition 1 goes away, the processElement call will return "do not
resume", so a timer will not be set and instead the state associated with
K1 will be GC'd.

So basically it's almost like cooperative thread scheduling: things run for
a while, until the runner tells them to checkpoint, then they set a timer
to resume themselves, and the runner fires the timers, and the process
repeats. And, again, this only requires things that runners can already do
- state and timers, but no new concept of SDF executor (and consequently no
necessity to choose/tune how many you need).

Makes sense?

On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi,
I have another question that I think wasn't addressed in the meeting. At
least it wasn't mentioned in the notes.

In the context of replacing sources by a combination of to SDFs, how do you
determine how many "SDF executor" instances you need downstream? For the
sake of argument assume that both SDFs are executed with parallelism 1 (or
one per worker). Now, if you have a file source that reads from a static
set of files the first SDF would emit the filenames while the second SDF
would receive the filenames and emit their contents. This works well and
the downstream SDF can process one filename after the other. Now, think of
something like a Kafka source. The first SDF would emit the partitions (say
4 partitions, in this example) and the second SDF would be responsible for
reading from a topic and emitting elements. Reading from one topic never
finishes so you can't process the topics in series. I think you would need
to have 4 downstream "SDF executor" instances. The question now is: how do
you determine whether you are in the first or the second situation?

Probably I'm just overlooking something and this is already dealt with
somewhere... :-)

Cheers,
Aljoscha

On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> wrote:

Hello,

Thanks for the notes both Dan and Eugene, and for taking the time to do
the
presentation and  answer our questions.

I mentioned the ongoing work on dynamic scaling on Flink because I
suppose
that it will address dynamic rebalancing eventually (there are multiple
changes going on for dynamic scaling).



https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4

https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8

Anyway I am far from an expert on flink, but probably the flink guys can
give their opinion about this and refer to a more precise document that
the
ones I mentioned..

​Thanks again,
Ismaël​

On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Great summary Eugene and Dan.

And thanks again for the details, explanation, and discussion.

Regards
JB


On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:

Thanks for attending, everybody!

Here are meeting notes (thanks Dan!).

Q: Will SplittableDoFn enable better repartitioning of the
input/output
data?
A: Not really; repartitioning is orthogonal to SDF.

Current Source API suffers from lack of composition and scalability
because
we treat sources too much as metadata, not enough as data.

Q(slide with transform expansion): who does the "magic"?
A: The runner. Checkpointing and dynamically splitting restrictions
will
require collaboration with the runner.

Q: How does the runner interact with the DoFn to control the
restrictions?
Is it related to the centralized job tracker etc.?
A: RestrictionTracker is a simple helper object, that exists purely on
the
worker while executing a single partition, and interacts with the
worker
harness part of the runner. Not to be confused with the centralized
job
tracker (master) - completely unrelated. Worker harness, of course,
interacts with the master in some relevant ways (e.g. Dataflow master
can
tell "you're a straggler, you should split").

Q: Is this a new DoFn subclass, or how will this integrate with the
existing code?
A: It's a feature of reflection-based DoFn (
https://s.apache.org/a-new-do
fn)
- just another optional parameter of type RestrictionTracker to
processElement() which is dynamically bound via reflection, so fully
backward/forward compatible, and looks to users like a regular DoFn.

Q: why is fractionClaimed a double?
A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
rebalancing) requires a uniform way to represent progress through
different
sources.

Q: Spark runner is microbatch-based, so this seems to map well onto
checkpoint/resume, right?
A: Yes; actually the Dataflow runner is, at a worker level, also
microbatch-based. The way SDF interacts with a runner will be very
similar
to how a Bounded/UnboundedSource interacts with a runner.

Q: Using SDF, what would be the "packaging" of the IO?
A: Same as currently: package IO's as PTransforms and their
implementation
under the hood can be anything: Source, simple ParDo's, SDF, etc. E.g.
Datastore was recently refactored from BoundedSource to ParDo (ended
up
simpler and more scalable), transparently to users.

Q: What's the timeline; what to do with the IOs currently in
development?
A: Timeline is O(months). Keep doing what you're doing and working on
top
of Source APIs when necessary and simple ParDo's otherwise.

Q: What's the impact for the runner writers?
A: Tentatively expected that most of the code for running an SDF will
be
common to runners, with some amount of per-runner glue code, just like
GBK/windowing/triggering. Impact on Dataflow runner is larger since it
supports dynamic rebalancing in batch mode and this is the hardest
part,
but for other runners shouldn't be too hard.

JB: Talend has people who can help with this: e.g. help integrate into
Spark runner, refactor IOs etc. Amit also willing to chat about
supporting
SDF in Spark runner.

Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
will
send a link.


On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <j...@nanthrax.net

wrote:

Hi Eugene,

thanks for the reminder.

Just to prepare some topics for the call, please find some points:

1. Using SDF, what would be the "packaging" of the IO ? It sounds to
me
that we can keep the IO packaging style (using with* setters for the
IO
configuration) and replace PTransform, Source, Reader, ... directly
with
SDF. Correct ?

2. What's your plan in term of release to include SDF ? We have
several
IOs in preparation and I wonder if it's worth to start to use the new
SDF API or not.

3. What's the impact for the runner writers ? The runners will have
to
support SDF, that could be tricky depending of the execution engine.
In
the worst case where the runner can't fully support SDF, does it mean
that most of our IOs will be useless ?

Just my dumb topics ;)

Thanks,
See you at 8am !

Regards
JB

On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:

Hello everybody,

Just a reminder:

The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to
join
the call go to
https://hangouts.google.com/hangouts/_/google.com/splittabledofn .
I intend to go over the proposed design and then have a free-form
discussion.

Please have a skim through the proposal doc: https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down version of
the
doc to use as a guide when conducting the meeting,

https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing

.

I will post notes from the meeting on this thread afterwards.

Thanks, looking forward.

On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
<dhalp...@google.com.invalid

wrote:

This is pretty cool! I'll be there too. (unless the hangout gets too

full

-- if so, I'll drop out in favor of others who aren't lucky enough
to

get

to talk to Eugene all the time.)

On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <

psaltis.and...@gmail.com>

wrote:

+1 I'll join

On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <

apban...@cisco.com


wrote:

+ 1, me2




On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com

<javascript:;>>

wrote:

+1 as in I'll join ;-)

On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov

<kirpic...@google.com.invalid


wrote:

Sounds good, thanks!
Then Friday Aug 19th it is, 8am-9am PST,
https://staging.talkgadget.google.com/hangouts/_/google.

com/splittabledofn


On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <

j...@nanthrax.net

<javascript:;>>

wrote:

Hi

Unfortunately I will be in Ireland on August 15th. What about

Friday

19th ?


Regards
JB



On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:

Hi JB,

Sounds great, does the suggested time over videoconference
work

for

you?

On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <

j...@nanthrax.net <javascript:;>>

wrote:

Hi Eugene

May we talk together next week ? I like the proposal. I
would

just

need

some details for my understanding.

Thanks
Regards
JB



On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:

Hi JB,

What are your thoughts on this?

I'm also thinking of having a virtual meeting to explain
more

about

this
proposal if necessary, since I understand it is a lot to

digest.


How about: Monday Aug 15, 8am-9am Pacific time, over
Hangouts?
(link:




https://staging.talkgadget.google.com/hangouts/_/google.

com/splittabledofn

-
I confirmed that it can be joined without being logged
into a

Google

account)

Who'd be interested in attending, and does this time/date
work

for

people?

On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov

<kirpic...@google.com <javascript:;>>

wrote:

Hi JB, thanks for reading and for your comments!

It sounds like you are concerned about continued support
for

existing

IO's

people have developed, and about backward compatibility?

We do not need to remove the Source API, and all existing

Source-based

connectors will continue to work [though the document

proposes

at

some

point to make Read.from(Source) to translate to a wrapper

SDF

under

the

hood, to exercise the feature more and to make sure that
it

is

strictly

more powerful - but this is an optional implementation

detail].


Perhaps the document phrases this too strongly -
"replacing

the

Source

API": a better phrasing would be "introducing a new API so

powerful

and

easy-to-use that hopefully people will choose it over the

Source

API

all

the time, even though they don't have to" :) And we can

discuss

whether or

not to actually deprecate/remove the Source API at some

point

down

the

road, once it becomes clear whether this is the case or
not.

To give more context: this proposal came out of
discussions

within

the SDK

team over the past ~1.5 years, before the Beam project

existed,

on

how to

make major improvements to the Source API; perhaps it will

clarify

things

if I give a history of the ideas discussed:
- The first idea was to introduce a

Read.from(PCollection<Source>)

transform while keeping the Source API intact - this, given

appropriate

implementation, would solve most of the scalability and

composability

issues of IO's. Then most connectors would look like :

ParDo<A,

Source<B>>

+ Read.from().
- Then we figured that the Source class is an unnecessary

abstraction, as

it simply holds data. What if we only had a Reader<S, B>

class

where

S is

the source type and B the output type? Then connectors
would

be

something

like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
- Then somebody remarked that some of the features of
Source

are

useful to

ParDo's as well: e.g. ability to report progress when

processing a

very

heavy element, or ability to produce very large output in

parallel.

- The two previous bullets were already hinting that the

Read.using()

primitive might not be so special: it just takes S and

produces

B:

isn't

that what a ParDo does, plus some source magic, minus the

convenience

of

c.output() vs. the start/advance() state machine?
- At this point it became clear that we should explore

unifying

sources

and ParDo's, in particular: can we bring the magic of

sources

to

ParDo's

but without the limitations and coding inconveniences? And

this

is

how

SplittableDoFn was born: bringing source magic to a DoFn
by

providing

a

RangeTracker.
- Once the idea of "splittable DoFn's" was born, it became

clear

that

it

is strictly more general than sources; at least, in the

respect

that

sources have to produce output, while DoFn's don't: an SDF

may

very

well

produce no output at all, and simply perform a side effect

in

a

parallel/resumable way.
- Then there were countless hours of discussions on
unifying

the

bounded/unbounded cases, on the particulars of RangeTracker

APIs

reconciling parallelization and checkpointing, what the

relation

between

SDF and DF should be, etc. They culminated in the current

proposal.

The

proposal comes at a time when a couple of key ingredients

are

(almost)

ready: NewDoFn to make SDF look like a regular DoFn, and
the

State/Timers

proposal to enable unbounded work per element.

To put it shortly:
- Yes, we will support existing Source connectors, and
will

support

writing new ones, possibly forever. There is no interference

with

current

users of Source.
- The new API is an attempt to improve the Source API,
taken

to

its

logical limit where it turns out that users' goals can be

accomplished

easier and more generically entirely within ParDo's.

Let me know what you think, and thanks again!

On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré

<j...@nanthrax.net <javascript:;>>

wrote:

Hi Eugene,

Just a question: why is it in DoFn and note an
improvement

of

Source

?


If I understand correctly, it means that we will have to

refactore

all

existing IO: basically, what you propose is to remove all

Source

to

replace with NewDoFn.

I'm concern with this approach, especially in term of

timing:

clearly,

the IO is the area where we have to move forward in Beam
as

it

will

allow new users to start in their projects.
So, we started to bring new IOs: Kafka, JMS, Cassandra,

MongoDB,

JDBC,

... and some people started to learn the IO API

(Bounded/Unbouded

source, etc).

I think it would make more sense to enhance the IO API

(Source)

instead

of introducing a NewDoFn.

What are your thoughts for IO writer like me ? ;)

Regards
JB

On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:

Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw) would like

to

propose

"Splittable DoFn" - a major generalization of DoFn, which

allows

processing

of a single element to be non-monolithic, i.e.

checkpointable

and

parallelizable, as well as doing an unbounded amount of

work

per

element.


This allows effectively replacing the current

Bounded/UnboundedSource

APIs

with DoFn's that are much easier to code, more scalable

and

composable

with

the rest of the Beam programming model, and enables many

use

cases

that

were previously difficult or impossible, as well as some

non-obvious new

use cases.

This proposal has been mentioned before in JIRA
[BEAM-65]

and

some

Beam

meetings, and now the whole thing is written up in a

document:


        https://s.apache.org/splittable-do-fn

Here are some things that become possible with
Splittable

DoFn:

- Efficiently read a filepattern matching millions of

files

- Read a collection of files that are produced by an

earlier

step

in the

pipeline (e.g. easily implement a connector to a storage

system

that can

export itself to files)
- Implement a Kafka reader by composing a "list

partitions"

DoFn

with a

DoFn that simply polls a consumer and outputs new records

in

a

while()

loop

- Implement a log tailer by composing a DoFn that

incrementally

returns

new

files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common"

algorithm

(matrix

squaring) with good work balancing

Here is the meaningful part of a hypothetical Kafka

reader

written

against

this API:

    ProcessContinuation processElement(
            ProcessContext context, OffsetRangeTracker

tracker)

{

      try (KafkaConsumer<String, String> consumer =
                Kafka.subscribe(context.element().topic,

 context.element().partition)) {

        consumer.seek(tracker.start());
        while (true) {
          ConsumerRecords<String, String> records =

consumer.poll(100ms);

          if (records == null) return done();
          for (ConsumerRecord<String, String> record :

records)

{

            if (!tracker.tryClaim(record.offset())) {
              return

resume().withFutureOutputWatermark(record.timestamp());

            }
            context.output(record);
          }
        }
      }
    }

The document describes in detail the motivations behind

this

feature,

the

basic idea and API, open questions, and outlines an

incremental

delivery

plan.

The proposed API builds on the reflection-based new DoFn

[new-do-fn]

and is

loosely related to "State and Timers for DoFn"

[beam-state].


Please take a look and comment!

Thanks.

[BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
[new-do-fn] https://s.apache.org/a-new-do-fn
[beam-state] https://s.apache.org/beam-state


--
Jean-Baptiste Onofré
jbono...@apache.org <javascript:;>
http://blog.nanthrax.net
Talend - http://www.talend.com








--
Thanks,
Andrew

Subscribe to my book: Streaming Data <http://manning.com/psaltis>
<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <
http://twitter.com/intent/user?screen_name=itmdata





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to