Great !!!!!

Let me experiment a bit in SDF (especially in the IO).

I keep you posted.

Regards
JB

On 10/13/2016 02:55 AM, Eugene Kirpichov wrote:
Hey all,

An update: https://github.com/apache/incubator-beam/pull/896 has been
merged, laying groundwork and adding support for splittable DoFn to the
in-memory runner.

What this PR does:
- It defines an API, in full accordance with the proposal discussed on this
thread.
- It adds a mostly runner-agnostic expansion of the ParDo transform for a
splittable DoFn, with one runner-specific primitive transform that needs to
be overridden by every runner.
- It overrides said transform in the in-memory runner, so this works
end-to-end in the in-memory runner.
- All this code is covered by tests (unit and integration
@RunnableOnService) and appears to work properly in combination with the
rest of the Beam model: e.g., inputs to a splittable DoFn can be windowed,
and their windows and timestamps are transparently propagated.

Caveats:
- The API is marked @Experimental, but this is an understatement: it is
assumed to be in flux and is not intended to be used yet. Overwhelmingly
likely, it *will* change in incompatible ways. DO NOT write pipelines with
this transform yet.
- It only works in the in-memory runner: the vast majority of code is
runner-agnostic, but a central runner-specific primitive transform is only
overridden by the in-memory runner.

My immediate next plan is to make this work in the Cloud Dataflow streaming
runner (since this is the runner I'm most familiar with), in order to get
experience with what kind of runner hooks are needed and to put the API in
shape for adding hooks for other runners - and then work either myself or
with the community on making it work in other runners too. Once all runners
sufficiently support a particular subset of features, we can start
transitioning some connectors or writing new ones using that subset (I
expect that streaming connectors will come first).

Additionally, the Python SDK is considering using Splittable DoFn as the
*only* API for streaming sources (right now it doesn't have any API for
that, so there's no compatibility concerns). No implementation work has
happened yet, but it seems like a good idea.

On Tue, Aug 30, 2016 at 1:45 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

Thanks for the explanation Eugene and JB.

By the way, I'm not trying to find holes in this, I really like the
feature. I just sometimes wonder how a specific thing might be implemented
with this.

On Mon, 29 Aug 2016 at 18:00 Eugene Kirpichov <kirpic...@google.com.invalid

wrote:

Hi Aljoscha,

The watermark reporting is done via
ProcessContinuation.futureOutputWatermark, at the granularity of
returning
from individual processElement() calls - you return from the call and
give
a watermark on your future output. We assume that updating watermark is
sufficient at a per-bundle level (or, if not, then that you can make
bundles small enough) cause that's the same level at which state changes,
timers etc. are committed.
It can be implemented by setting a per-key watermark hold and updating it
when each call for this element returns. That's the way it is implemented
in my current prototype
https://github.com/apache/incubator-beam/pull/896
(see
SplittableParDo.ProcessFn)

On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi,
I have another question about this: currently, unbounded sources have
special logic for determining the watermark and the system periodically
asks the sources for the current watermark. As I understood it,
watermarks
are only "generated" at the sources. How will this work when sources
are
implemented as a combination of DoFns and SplittableDoFns? Will
SplittableDoFns be asked for a watermark, does this mean that
watermarks
can then be "generated" at any operation?

Cheers,
Aljoscha

On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov
<kirpic...@google.com.invalid

wrote:

Hi JB,

Yes, I'm assuming you're referring to the "magic" part on the
transform
expansion diagram. This is indeed runner-specific, and timers+state
are
likely the simplest way to do this for an SDF that does unbounded
amount
of
work.

On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <
j...@nanthrax.net

wrote:

Anyway, from a runner perspective, we will have kind of API (part
of
the
Runner API) to "orchestrate" the SDF as we discussed during the
call,
right ?

Regards
JB

On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
Hi Aljoscha,
This is an excellent question! And the answer is, we don't need
any
new
concepts like "SDF executor" and can rely on the per-key state
and
timers
machinery that already exists in all runners because it's
necessary
to
implement windowing/triggering properly.

Note that this is already somewhat addressed in the previously
posted
State
and Timers proposal https://s.apache.org/beam-state , under
"per-key
workflows".

Think of it this way, using the Kafka example: we'll expand it
into a
transform:

(1) ParDo { topic -> (unique key, topic, partition, [0, inf)))
for
partition in topic.listPartitions() }
(2) GroupByKey
(3) ParDo { key, topic, partition, R -> Kafka reader code in the
proposal/slides }
  - R is the OffsetRange restriction which in this case will be
always
of
the form [startOffset, inf).
  - there'll be just 1 value per key, but we use GBK to just get
access
to
the per-key state/timers machinery. This may be runner-specific;
maybe
some
runners don't need a GBK to do that.

Now suppose the topic has two partitions, P1 and P2, and they get
assigned
unique keys K1, K2.
Then the input to (3) will be a collection of: (K1, topic, P1,
[0,
inf)),
(K2, topic, P2, [0, inf)).
Suppose we have just 1 worker with just 1 thread. Now, how will
this
thread
be able to produce elements from both P1 and P2? here's how.

The thread will process (K1, topic, P1, [0, inf)), checkpoint
after a
certain time or after a certain number of elements are output
(just
like
with the current UnboundedSource reading code) producing a
residual
restriction R1' (basically a new start timestamp), put R11 into
the
per-key
state and set a timer T1 to resume.
Then it will process (K2, topic, P2, [0, inf)), do the same
producing a
residual restriction R2' and setting a timer T2 to resume.
Then timer T1 will fire in the context of the key K1. The thread
will
call
processElement again, this time supplying R1' as the restriction;
the
process repeats and after a while it checkpoints and stores R1''
into
state
of K1.
Then timer T2 will fire in the context of K2, run processElement
for
a
while, set a new timer and store R2'' into the state of K2.
Etc.
If partition 1 goes away, the processElement call will return "do
not
resume", so a timer will not be set and instead the state
associated
with
K1 will be GC'd.

So basically it's almost like cooperative thread scheduling:
things
run
for
a while, until the runner tells them to checkpoint, then they
set a
timer
to resume themselves, and the runner fires the timers, and the
process
repeats. And, again, this only requires things that runners can
already
do
- state and timers, but no new concept of SDF executor (and
consequently
no
necessity to choose/tune how many you need).

Makes sense?

On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <
aljos...@apache.org>
wrote:

Hi,
I have another question that I think wasn't addressed in the
meeting.
At
least it wasn't mentioned in the notes.

In the context of replacing sources by a combination of to SDFs,
how
do
you
determine how many "SDF executor" instances you need downstream?
For
the
sake of argument assume that both SDFs are executed with
parallelism 1
(or
one per worker). Now, if you have a file source that reads from
a
static
set of files the first SDF would emit the filenames while the
second
SDF
would receive the filenames and emit their contents. This works
well
and
the downstream SDF can process one filename after the other.
Now,
think
of
something like a Kafka source. The first SDF would emit the
partitions
(say
4 partitions, in this example) and the second SDF would be
responsible
for
reading from a topic and emitting elements. Reading from one
topic
never
finishes so you can't process the topics in series. I think you
would
need
to have 4 downstream "SDF executor" instances. The question now
is:
how
do
you determine whether you are in the first or the second
situation?

Probably I'm just overlooking something and this is already
dealt
with
somewhere... :-)

Cheers,
Aljoscha

On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com>
wrote:

Hello,

Thanks for the notes both Dan and Eugene, and for taking the
time
to
do
the
presentation and  answer our questions.

I mentioned the ongoing work on dynamic scaling on Flink
because
I
suppose
that it will address dynamic rebalancing eventually (there are
multiple
changes going on for dynamic scaling).








https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4



https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8

Anyway I am far from an expert on flink, but probably the flink
guys
can
give their opinion about this and refer to a more precise
document
that
the
ones I mentioned..

​Thanks again,
Ismaël​

On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
j...@nanthrax.net

wrote:

Great summary Eugene and Dan.

And thanks again for the details, explanation, and discussion.

Regards
JB


On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:

Thanks for attending, everybody!

Here are meeting notes (thanks Dan!).

Q: Will SplittableDoFn enable better repartitioning of the
input/output
data?
A: Not really; repartitioning is orthogonal to SDF.

Current Source API suffers from lack of composition and
scalability
because
we treat sources too much as metadata, not enough as data.

Q(slide with transform expansion): who does the "magic"?
A: The runner. Checkpointing and dynamically splitting
restrictions
will
require collaboration with the runner.

Q: How does the runner interact with the DoFn to control the
restrictions?
Is it related to the centralized job tracker etc.?
A: RestrictionTracker is a simple helper object, that exists
purely
on
the
worker while executing a single partition, and interacts with
the
worker
harness part of the runner. Not to be confused with the
centralized
job
tracker (master) - completely unrelated. Worker harness, of
course,
interacts with the master in some relevant ways (e.g.
Dataflow
master
can
tell "you're a straggler, you should split").

Q: Is this a new DoFn subclass, or how will this integrate
with
the
existing code?
A: It's a feature of reflection-based DoFn (
https://s.apache.org/a-new-do
fn)
- just another optional parameter of type RestrictionTracker
to
processElement() which is dynamically bound via reflection,
so
fully
backward/forward compatible, and looks to users like a
regular
DoFn.

Q: why is fractionClaimed a double?
A: It's a number in 0..1. Job-wide magic (e.g. autoscaling,
dynamic
rebalancing) requires a uniform way to represent progress
through
different
sources.

Q: Spark runner is microbatch-based, so this seems to map
well
onto
checkpoint/resume, right?
A: Yes; actually the Dataflow runner is, at a worker level,
also
microbatch-based. The way SDF interacts with a runner will be
very
similar
to how a Bounded/UnboundedSource interacts with a runner.

Q: Using SDF, what would be the "packaging" of the IO?
A: Same as currently: package IO's as PTransforms and their
implementation
under the hood can be anything: Source, simple ParDo's, SDF,
etc.
E.g.
Datastore was recently refactored from BoundedSource to ParDo
(ended
up
simpler and more scalable), transparently to users.

Q: What's the timeline; what to do with the IOs currently in
development?
A: Timeline is O(months). Keep doing what you're doing and
working
on
top
of Source APIs when necessary and simple ParDo's otherwise.

Q: What's the impact for the runner writers?
A: Tentatively expected that most of the code for running an
SDF
will
be
common to runners, with some amount of per-runner glue code,
just
like
GBK/windowing/triggering. Impact on Dataflow runner is larger
since
it
supports dynamic rebalancing in batch mode and this is the
hardest
part,
but for other runners shouldn't be too hard.

JB: Talend has people who can help with this: e.g. help
integrate
into
Spark runner, refactor IOs etc. Amit also willing to chat
about
supporting
SDF in Spark runner.

Ismael: There's a Flink proposal about dynamic rebalancing.
Ismael
will
send a link.


On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
j...@nanthrax.net

wrote:

Hi Eugene,

thanks for the reminder.

Just to prepare some topics for the call, please find some
points:

1. Using SDF, what would be the "packaging" of the IO ? It
sounds
to
me
that we can keep the IO packaging style (using with* setters
for
the
IO
configuration) and replace PTransform, Source, Reader, ...
directly
with
SDF. Correct ?

2. What's your plan in term of release to include SDF ? We
have
several
IOs in preparation and I wonder if it's worth to start to
use
the
new
SDF API or not.

3. What's the impact for the runner writers ? The runners
will
have
to
support SDF, that could be tricky depending of the execution
engine.
In
the worst case where the runner can't fully support SDF,
does
it
mean
that most of our IOs will be useless ?

Just my dumb topics ;)

Thanks,
See you at 8am !

Regards
JB

On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:

Hello everybody,

Just a reminder:

The meeting is happening tomorrow - Friday Aug 19th,
8am-9am
PST,
to
join
the call go to

https://hangouts.google.com/hangouts/_/google.com/splittabledofn
.
I intend to go over the proposed design and then have a
free-form
discussion.

Please have a skim through the proposal doc:
https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down
version
of
the
doc to use as a guide when conducting the meeting,


https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing

.

I will post notes from the meeting on this thread
afterwards.

Thanks, looking forward.

On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
<dhalp...@google.com.invalid

wrote:

This is pretty cool! I'll be there too. (unless the hangout
gets
too

full

-- if so, I'll drop out in favor of others who aren't lucky
enough
to

get

to talk to Eugene all the time.)

On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <

psaltis.and...@gmail.com>

wrote:

+1 I'll join

On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <

apban...@cisco.com


wrote:

+ 1, me2




On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com

<javascript:;>>

wrote:

+1 as in I'll join ;-)

On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov

<kirpic...@google.com.invalid


wrote:

Sounds good, thanks!
Then Friday Aug 19th it is, 8am-9am PST,

https://staging.talkgadget.google.com/hangouts/_/google
.

com/splittabledofn


On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré
<

j...@nanthrax.net

<javascript:;>>

wrote:

Hi

Unfortunately I will be in Ireland on August 15th.
What
about

Friday

19th ?


Regards
JB



On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:

Hi JB,

Sounds great, does the suggested time over
videoconference
work

for

you?

On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste
Onofré
<

j...@nanthrax.net <javascript:;>>

wrote:

Hi Eugene

May we talk together next week ? I like the
proposal. I
would

just

need

some details for my understanding.

Thanks
Regards
JB



On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:

Hi JB,

What are your thoughts on this?

I'm also thinking of having a virtual meeting to
explain
more

about

this
proposal if necessary, since I understand it is a
lot
to

digest.


How about: Monday Aug 15, 8am-9am Pacific time,
over
Hangouts?
(link:





https://staging.talkgadget.google.com/hangouts/_/google.

com/splittabledofn

-
I confirmed that it can be joined without being
logged
into a

Google

account)

Who'd be interested in attending, and does this
time/date
work

for

people?

On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov

<kirpic...@google.com <javascript:;>>

wrote:

Hi JB, thanks for reading and for your comments!

It sounds like you are concerned about continued
support
for

existing

IO's

people have developed, and about backward
compatibility?

We do not need to remove the Source API, and all
existing

Source-based

connectors will continue to work [though the
document

proposes

at

some

point to make Read.from(Source) to translate to a
wrapper

SDF

under

the

hood, to exercise the feature more and to make
sure
that
it

is

strictly

more powerful - but this is an optional
implementation

detail].


Perhaps the document phrases this too strongly -
"replacing

the

Source

API": a better phrasing would be "introducing a
new
API
so

powerful

and

easy-to-use that hopefully people will choose it
over
the

Source

API

all

the time, even though they don't have to" :) And
we
can

discuss

whether or

not to actually deprecate/remove the Source API
at
some

point

down

the

road, once it becomes clear whether this is the
case
or
not.

To give more context: this proposal came out of
discussions

within

the SDK

team over the past ~1.5 years, before the Beam
project

existed,

on

how to

make major improvements to the Source API;
perhaps
it
will

clarify

things

if I give a history of the ideas discussed:
- The first idea was to introduce a

Read.from(PCollection<Source>)

transform while keeping the Source API intact - this,
given

appropriate

implementation, would solve most of the
scalability
and

composability

issues of IO's. Then most connectors would look
like
:

ParDo<A,

Source<B>>

+ Read.from().
- Then we figured that the Source class is an
unnecessary

abstraction, as

it simply holds data. What if we only had a
Reader<S,
B>

class

where

S is

the source type and B the output type? Then
connectors
would

be

something

like: ParDo<A, S> + hypothetical
Read.using(Reader<S,
B>).
- Then somebody remarked that some of the
features
of
Source

are

useful to

ParDo's as well: e.g. ability to report progress
when

processing a

very

heavy element, or ability to produce very large
output
in

parallel.

- The two previous bullets were already hinting
that
the

Read.using()

primitive might not be so special: it just takes S
and

produces

B:

isn't

that what a ParDo does, plus some source magic,
minus
the

convenience

of

c.output() vs. the start/advance() state machine?
- At this point it became clear that we should
explore

unifying

sources

and ParDo's, in particular: can we bring the
magic
of

sources

to

ParDo's

but without the limitations and coding
inconveniences?
And

this

is

how

SplittableDoFn was born: bringing source magic
to a
DoFn
by

providing

a

RangeTracker.
- Once the idea of "splittable DoFn's" was born,
it
became

clear

that

it

is strictly more general than sources; at least,
in
the

respect

that

sources have to produce output, while DoFn's don't:
an
SDF

may

very

well

produce no output at all, and simply perform a
side
effect

in

a

parallel/resumable way.
- Then there were countless hours of discussions
on
unifying

the

bounded/unbounded cases, on the particulars of
RangeTracker

APIs

reconciling parallelization and checkpointing, what the

relation

between

SDF and DF should be, etc. They culminated in the
current

proposal.

The

proposal comes at a time when a couple of key
ingredients

are

(almost)

ready: NewDoFn to make SDF look like a regular
DoFn,
and
the

State/Timers

proposal to enable unbounded work per element.

To put it shortly:
- Yes, we will support existing Source
connectors,
and
will

support

writing new ones, possibly forever. There is no
interference

with

current

users of Source.
- The new API is an attempt to improve the Source
API,
taken

to

its

logical limit where it turns out that users' goals
can
be

accomplished

easier and more generically entirely within
ParDo's.

Let me know what you think, and thanks again!

On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste
Onofré

<j...@nanthrax.net <javascript:;>>

wrote:

Hi Eugene,

Just a question: why is it in DoFn and note an
improvement

of

Source

?


If I understand correctly, it means that we will
have
to

refactore

all

existing IO: basically, what you propose is to
remove
all

Source

to

replace with NewDoFn.

I'm concern with this approach, especially in
term
of

timing:

clearly,

the IO is the area where we have to move forward
in
Beam
as

it

will

allow new users to start in their projects.
So, we started to bring new IOs: Kafka, JMS,
Cassandra,

MongoDB,

JDBC,

... and some people started to learn the IO API

(Bounded/Unbouded

source, etc).

I think it would make more sense to enhance the
IO
API

(Source)

instead

of introducing a NewDoFn.

What are your thoughts for IO writer like me ?
;)

Regards
JB

On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:

Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw)
would
like

to

propose

"Splittable DoFn" - a major generalization of
DoFn,
which

allows

processing

of a single element to be non-monolithic, i.e.

checkpointable

and

parallelizable, as well as doing an unbounded
amount
of

work

per

element.


This allows effectively replacing the current

Bounded/UnboundedSource

APIs

with DoFn's that are much easier to code, more
scalable

and

composable

with

the rest of the Beam programming model, and
enables
many

use

cases

that

were previously difficult or impossible, as well
as
some

non-obvious new

use cases.

This proposal has been mentioned before in JIRA
[BEAM-65]

and

some

Beam

meetings, and now the whole thing is written up
in
a

document:


        https://s.apache.org/splittable-do-fn

Here are some things that become possible with
Splittable

DoFn:

- Efficiently read a filepattern matching millions of

files

- Read a collection of files that are produced by an

earlier

step

in the

pipeline (e.g. easily implement a connector to a
storage

system

that can

export itself to files)
- Implement a Kafka reader by composing a "list

partitions"

DoFn

with a

DoFn that simply polls a consumer and outputs new
records

in

a

while()

loop

- Implement a log tailer by composing a DoFn
that

incrementally

returns

new

files in a directory and a DoFn that tails a
file
- Implement a parallel "count friends in
common"

algorithm

(matrix

squaring) with good work balancing

Here is the meaningful part of a hypothetical
Kafka

reader

written

against

this API:

    ProcessContinuation processElement(
            ProcessContext context,
OffsetRangeTracker

tracker)

{

      try (KafkaConsumer<String, String> consumer =

 Kafka.subscribe(context.element().topic,

 context.element().partition)) {

        consumer.seek(tracker.start());
        while (true) {
          ConsumerRecords<String, String>
records =

consumer.poll(100ms);

          if (records == null) return done();
          for (ConsumerRecord<String, String>
record
:

records)

{

            if
(!tracker.tryClaim(record.offset())) {
              return


resume().withFutureOutputWatermark(record.timestamp());

            }
            context.output(record);
          }
        }
      }
    }

The document describes in detail the
motivations
behind

this

feature,

the

basic idea and API, open questions, and
outlines
an

incremental

delivery

plan.

The proposed API builds on the reflection-based
new
DoFn

[new-do-fn]

and is

loosely related to "State and Timers for DoFn"

[beam-state].





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Reply via email to