I think there are two perspectives:
- from a programming model, SDF will be in the SDK.
- from an execution layer, SDF will be in the runner API, and leverage
by the runners.
The runners will use the runner API: the SDF trackers, etc will be
common to all runners. Then, each runner will have to provide a
translator for SDF. The autoscaling, dynamic balancing, etc depends of
both the runner, but also to the runtime engine. If the runtime engine
doesn't support such features, then, the runner can't do magic ;)
Right now, only the Google Cloud Dataflow runner leverage all features
about autoscaling, etc as Google Cloud Dataflow platform fully support it.
Regards
JB
On 08/29/2016 05:13 PM, Ovidiu-Cristian MARCU wrote:
Thank you, JB!
It will be helpful to have a structured access to the technical discussions.
Looking into the Splittable DoFn proposal [1] the authors are pointing what I
think to be the current main design issue:
Source objects cannot be produced by the pipeline at runtime.
You are probably familiar with Husky [4] where they discuss about dynamically
object creation; maybe semantics are different
but to me it seems to tackle the same problem. Their point could than fit into
the Beam model?
I wonder if they could aspire to be a runner of Beam, I would appreciate any
remarks on this.
Is this proposal mainly related to supporting autoscaling and dynamic work
rebalancing as described in [2] & [3]?
Thank you and I am sorry if I interrupted your discussion on the proposal.
[1]
https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#
<https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#>
[2]
https://cloud.google.com/blog/big-data/2016/03/comparing-cloud-dataflow-autoscaling-to-spark-and-hadoop
<https://cloud.google.com/blog/big-data/2016/03/comparing-cloud-dataflow-autoscaling-to-spark-and-hadoop>
[3]
https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
<https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow>
[4] http://www.husky-project.com/ <http://www.husky-project.com/> ;
http://www.vldb.org/pvldb/vol9/p420-yang.pdf
<http://www.vldb.org/pvldb/vol9/p420-yang.pdf>
Best,
Ovidiu
On 29 Aug 2016, at 14:54, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
Hi Ovidiu,
We had a "Technical Discussions" link on the website menu, but I can't see it anymore on
the website (I just see "Technical Vision").
It contains all documents on which we are discussing.
Agree with you to have an area where we store all "Technical Discussion
Documents".
Let me discuss with Frances about that.
Thanks !
Regards
JB
On 08/29/2016 02:50 PM, Ovidiu-Cristian MARCU wrote:
Hi everyone
Is there any repository where one can track all proposals, something like Flink
does with this wiki [1]?
[1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
<https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals>
Thanks
Ovidiu
On 29 Aug 2016, at 12:01, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
Hi Aljoscha,
Indeed, it's something we discussed during our call.
AFAIU, it's one function of the tracker. When doing the tracker tryClaim (with offset,
partition id, or any kind of tracked "ID"), if the claim is not possible, then
we will update the watermark.
So the tracker is useful to determine the "split" and also to deal with
watermark.
Regards
JB
On 08/29/2016 11:55 AM, Aljoscha Krettek wrote:
Hi,
I have another question about this: currently, unbounded sources have
special logic for determining the watermark and the system periodically
asks the sources for the current watermark. As I understood it, watermarks
are only "generated" at the sources. How will this work when sources are
implemented as a combination of DoFns and SplittableDoFns? Will
SplittableDoFns be asked for a watermark, does this mean that watermarks
can then be "generated" at any operation?
Cheers,
Aljoscha
On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <kirpic...@google.com.invalid>
wrote:
Hi JB,
Yes, I'm assuming you're referring to the "magic" part on the transform
expansion diagram. This is indeed runner-specific, and timers+state are
likely the simplest way to do this for an SDF that does unbounded amount of
work.
On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:
Anyway, from a runner perspective, we will have kind of API (part of the
Runner API) to "orchestrate" the SDF as we discussed during the call,
right ?
Regards
JB
On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
Hi Aljoscha,
This is an excellent question! And the answer is, we don't need any new
concepts like "SDF executor" and can rely on the per-key state and
timers
machinery that already exists in all runners because it's necessary to
implement windowing/triggering properly.
Note that this is already somewhat addressed in the previously posted
State
and Timers proposal https://s.apache.org/beam-state , under "per-key
workflows".
Think of it this way, using the Kafka example: we'll expand it into a
transform:
(1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
partition in topic.listPartitions() }
(2) GroupByKey
(3) ParDo { key, topic, partition, R -> Kafka reader code in the
proposal/slides }
- R is the OffsetRange restriction which in this case will be always
of
the form [startOffset, inf).
- there'll be just 1 value per key, but we use GBK to just get access
to
the per-key state/timers machinery. This may be runner-specific; maybe
some
runners don't need a GBK to do that.
Now suppose the topic has two partitions, P1 and P2, and they get
assigned
unique keys K1, K2.
Then the input to (3) will be a collection of: (K1, topic, P1, [0,
inf)),
(K2, topic, P2, [0, inf)).
Suppose we have just 1 worker with just 1 thread. Now, how will this
thread
be able to produce elements from both P1 and P2? here's how.
The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
certain time or after a certain number of elements are output (just
like
with the current UnboundedSource reading code) producing a residual
restriction R1' (basically a new start timestamp), put R11 into the
per-key
state and set a timer T1 to resume.
Then it will process (K2, topic, P2, [0, inf)), do the same producing a
residual restriction R2' and setting a timer T2 to resume.
Then timer T1 will fire in the context of the key K1. The thread will
call
processElement again, this time supplying R1' as the restriction; the
process repeats and after a while it checkpoints and stores R1'' into
state
of K1.
Then timer T2 will fire in the context of K2, run processElement for a
while, set a new timer and store R2'' into the state of K2.
Etc.
If partition 1 goes away, the processElement call will return "do not
resume", so a timer will not be set and instead the state associated
with
K1 will be GC'd.
So basically it's almost like cooperative thread scheduling: things run
for
a while, until the runner tells them to checkpoint, then they set a
timer
to resume themselves, and the runner fires the timers, and the process
repeats. And, again, this only requires things that runners can already
do
- state and timers, but no new concept of SDF executor (and
consequently
no
necessity to choose/tune how many you need).
Makes sense?
On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org>
wrote:
Hi,
I have another question that I think wasn't addressed in the meeting.
At
least it wasn't mentioned in the notes.
In the context of replacing sources by a combination of to SDFs, how
do
you
determine how many "SDF executor" instances you need downstream? For
the
sake of argument assume that both SDFs are executed with parallelism 1
(or
one per worker). Now, if you have a file source that reads from a
static
set of files the first SDF would emit the filenames while the second
SDF
would receive the filenames and emit their contents. This works well
and
the downstream SDF can process one filename after the other. Now,
think
of
something like a Kafka source. The first SDF would emit the partitions
(say
4 partitions, in this example) and the second SDF would be responsible
for
reading from a topic and emitting elements. Reading from one topic
never
finishes so you can't process the topics in series. I think you would
need
to have 4 downstream "SDF executor" instances. The question now is:
how
do
you determine whether you are in the first or the second situation?
Probably I'm just overlooking something and this is already dealt with
somewhere... :-)
Cheers,
Aljoscha
On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> wrote:
Hello,
Thanks for the notes both Dan and Eugene, and for taking the time to
do
the
presentation and answer our questions.
I mentioned the ongoing work on dynamic scaling on Flink because I
suppose
that it will address dynamic rebalancing eventually (there are
multiple
changes going on for dynamic scaling).
https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8
Anyway I am far from an expert on flink, but probably the flink guys
can
give their opinion about this and refer to a more precise document
that
the
ones I mentioned..
Thanks again,
Ismaël
On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
j...@nanthrax.net
wrote:
Great summary Eugene and Dan.
And thanks again for the details, explanation, and discussion.
Regards
JB
On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
Thanks for attending, everybody!
Here are meeting notes (thanks Dan!).
Q: Will SplittableDoFn enable better repartitioning of the
input/output
data?
A: Not really; repartitioning is orthogonal to SDF.
Current Source API suffers from lack of composition and scalability
because
we treat sources too much as metadata, not enough as data.
Q(slide with transform expansion): who does the "magic"?
A: The runner. Checkpointing and dynamically splitting restrictions
will
require collaboration with the runner.
Q: How does the runner interact with the DoFn to control the
restrictions?
Is it related to the centralized job tracker etc.?
A: RestrictionTracker is a simple helper object, that exists purely
on
the
worker while executing a single partition, and interacts with the
worker
harness part of the runner. Not to be confused with the centralized
job
tracker (master) - completely unrelated. Worker harness, of course,
interacts with the master in some relevant ways (e.g. Dataflow
master
can
tell "you're a straggler, you should split").
Q: Is this a new DoFn subclass, or how will this integrate with the
existing code?
A: It's a feature of reflection-based DoFn (
https://s.apache.org/a-new-do
fn)
- just another optional parameter of type RestrictionTracker to
processElement() which is dynamically bound via reflection, so
fully
backward/forward compatible, and looks to users like a regular
DoFn.
Q: why is fractionClaimed a double?
A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
rebalancing) requires a uniform way to represent progress through
different
sources.
Q: Spark runner is microbatch-based, so this seems to map well onto
checkpoint/resume, right?
A: Yes; actually the Dataflow runner is, at a worker level, also
microbatch-based. The way SDF interacts with a runner will be very
similar
to how a Bounded/UnboundedSource interacts with a runner.
Q: Using SDF, what would be the "packaging" of the IO?
A: Same as currently: package IO's as PTransforms and their
implementation
under the hood can be anything: Source, simple ParDo's, SDF, etc.
E.g.
Datastore was recently refactored from BoundedSource to ParDo
(ended
up
simpler and more scalable), transparently to users.
Q: What's the timeline; what to do with the IOs currently in
development?
A: Timeline is O(months). Keep doing what you're doing and working
on
top
of Source APIs when necessary and simple ParDo's otherwise.
Q: What's the impact for the runner writers?
A: Tentatively expected that most of the code for running an SDF
will
be
common to runners, with some amount of per-runner glue code, just
like
GBK/windowing/triggering. Impact on Dataflow runner is larger since
it
supports dynamic rebalancing in batch mode and this is the hardest
part,
but for other runners shouldn't be too hard.
JB: Talend has people who can help with this: e.g. help integrate
into
Spark runner, refactor IOs etc. Amit also willing to chat about
supporting
SDF in Spark runner.
Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
will
send a link.
On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
j...@nanthrax.net
wrote:
Hi Eugene,
thanks for the reminder.
Just to prepare some topics for the call, please find some points:
1. Using SDF, what would be the "packaging" of the IO ? It sounds
to
me
that we can keep the IO packaging style (using with* setters for
the
IO
configuration) and replace PTransform, Source, Reader, ...
directly
with
SDF. Correct ?
2. What's your plan in term of release to include SDF ? We have
several
IOs in preparation and I wonder if it's worth to start to use the
new
SDF API or not.
3. What's the impact for the runner writers ? The runners will
have
to
support SDF, that could be tricky depending of the execution
engine.
In
the worst case where the runner can't fully support SDF, does it
mean
that most of our IOs will be useless ?
Just my dumb topics ;)
Thanks,
See you at 8am !
Regards
JB
On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
Hello everybody,
Just a reminder:
The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST,
to
join
the call go to
https://hangouts.google.com/hangouts/_/google.com/splittabledofn
.
I intend to go over the proposed design and then have a free-form
discussion.
Please have a skim through the proposal doc:
https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down version
of
the
doc to use as a guide when conducting the meeting,
https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
.
I will post notes from the meeting on this thread afterwards.
Thanks, looking forward.
On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
<dhalp...@google.com.invalid
wrote:
This is pretty cool! I'll be there too. (unless the hangout gets
too
full
-- if so, I'll drop out in favor of others who aren't lucky
enough
to
get
to talk to Eugene all the time.)
On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
psaltis.and...@gmail.com>
wrote:
+1 I'll join
On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
apban...@cisco.com
wrote:
+ 1, me2
On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com
<javascript:;>>
wrote:
+1 as in I'll join ;-)
On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
<kirpic...@google.com.invalid
wrote:
Sounds good, thanks!
Then Friday Aug 19th it is, 8am-9am PST,
https://staging.talkgadget.google.com/hangouts/_/google.
com/splittabledofn
On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
j...@nanthrax.net
<javascript:;>>
wrote:
Hi
Unfortunately I will be in Ireland on August 15th. What
about
Friday
19th ?
Regards
JB
On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:
Hi JB,
Sounds great, does the suggested time over videoconference
work
for
you?
On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
j...@nanthrax.net <javascript:;>>
wrote:
Hi Eugene
May we talk together next week ? I like the proposal. I
would
just
need
some details for my understanding.
Thanks
Regards
JB
On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:
Hi JB,
What are your thoughts on this?
I'm also thinking of having a virtual meeting to explain
more
about
this
proposal if necessary, since I understand it is a lot to
digest.
How about: Monday Aug 15, 8am-9am Pacific time, over
Hangouts?
(link:
https://staging.talkgadget.google.com/hangouts/_/google.
com/splittabledofn
-
I confirmed that it can be joined without being logged
into a
Google
account)
Who'd be interested in attending, and does this
time/date
work
for
people?
On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
<kirpic...@google.com <javascript:;>>
wrote:
Hi JB, thanks for reading and for your comments!
It sounds like you are concerned about continued
support
for
existing
IO's
people have developed, and about backward
compatibility?
We do not need to remove the Source API, and all
existing
Source-based
connectors will continue to work [though the document
proposes
at
some
point to make Read.from(Source) to translate to a
wrapper
SDF
under
the
hood, to exercise the feature more and to make sure
that
it
is
strictly
more powerful - but this is an optional implementation
detail].
Perhaps the document phrases this too strongly -
"replacing
the
Source
API": a better phrasing would be "introducing a new API
so
powerful
and
easy-to-use that hopefully people will choose it over
the
Source
API
all
the time, even though they don't have to" :) And we can
discuss
whether or
not to actually deprecate/remove the Source API at some
point
down
the
road, once it becomes clear whether this is the case or
not.
To give more context: this proposal came out of
discussions
within
the SDK
team over the past ~1.5 years, before the Beam project
existed,
on
how to
make major improvements to the Source API; perhaps it
will
clarify
things
if I give a history of the ideas discussed:
- The first idea was to introduce a
Read.from(PCollection<Source>)
transform while keeping the Source API intact - this, given
appropriate
implementation, would solve most of the scalability and
composability
issues of IO's. Then most connectors would look like :
ParDo<A,
Source<B>>
+ Read.from().
- Then we figured that the Source class is an
unnecessary
abstraction, as
it simply holds data. What if we only had a Reader<S,
B>
class
where
S is
the source type and B the output type? Then connectors
would
be
something
like: ParDo<A, S> + hypothetical Read.using(Reader<S,
B>).
- Then somebody remarked that some of the features of
Source
are
useful to
ParDo's as well: e.g. ability to report progress when
processing a
very
heavy element, or ability to produce very large output
in
parallel.
- The two previous bullets were already hinting that the
Read.using()
primitive might not be so special: it just takes S and
produces
B:
isn't
that what a ParDo does, plus some source magic, minus
the
convenience
of
c.output() vs. the start/advance() state machine?
- At this point it became clear that we should explore
unifying
sources
and ParDo's, in particular: can we bring the magic of
sources
to
ParDo's
but without the limitations and coding inconveniences?
And
this
is
how
SplittableDoFn was born: bringing source magic to a
DoFn
by
providing
a
RangeTracker.
- Once the idea of "splittable DoFn's" was born, it
became
clear
that
it
is strictly more general than sources; at least, in the
respect
that
sources have to produce output, while DoFn's don't: an
SDF
may
very
well
produce no output at all, and simply perform a side
effect
in
a
parallel/resumable way.
- Then there were countless hours of discussions on
unifying
the
bounded/unbounded cases, on the particulars of RangeTracker
APIs
reconciling parallelization and checkpointing, what the
relation
between
SDF and DF should be, etc. They culminated in the
current
proposal.
The
proposal comes at a time when a couple of key
ingredients
are
(almost)
ready: NewDoFn to make SDF look like a regular DoFn,
and
the
State/Timers
proposal to enable unbounded work per element.
To put it shortly:
- Yes, we will support existing Source connectors, and
will
support
writing new ones, possibly forever. There is no
interference
with
current
users of Source.
- The new API is an attempt to improve the Source API,
taken
to
its
logical limit where it turns out that users' goals can be
accomplished
easier and more generically entirely within ParDo's.
Let me know what you think, and thanks again!
On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
<j...@nanthrax.net <javascript:;>>
wrote:
Hi Eugene,
Just a question: why is it in DoFn and note an
improvement
of
Source
?
If I understand correctly, it means that we will have
to
refactore
all
existing IO: basically, what you propose is to remove
all
Source
to
replace with NewDoFn.
I'm concern with this approach, especially in term of
timing:
clearly,
the IO is the area where we have to move forward in
Beam
as
it
will
allow new users to start in their projects.
So, we started to bring new IOs: Kafka, JMS,
Cassandra,
MongoDB,
JDBC,
... and some people started to learn the IO API
(Bounded/Unbouded
source, etc).
I think it would make more sense to enhance the IO API
(Source)
instead
of introducing a NewDoFn.
What are your thoughts for IO writer like me ? ;)
Regards
JB
On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
Hello Beam community,
We (myself, Daniel Mills and Robert Bradshaw) would
like
to
propose
"Splittable DoFn" - a major generalization of DoFn,
which
allows
processing
of a single element to be non-monolithic, i.e.
checkpointable
and
parallelizable, as well as doing an unbounded amount of
work
per
element.
This allows effectively replacing the current
Bounded/UnboundedSource
APIs
with DoFn's that are much easier to code, more
scalable
and
composable
with
the rest of the Beam programming model, and enables
many
use
cases
that
were previously difficult or impossible, as well as
some
non-obvious new
use cases.
This proposal has been mentioned before in JIRA
[BEAM-65]
and
some
Beam
meetings, and now the whole thing is written up in a
document:
https://s.apache.org/splittable-do-fn
Here are some things that become possible with
Splittable
DoFn:
- Efficiently read a filepattern matching millions of
files
- Read a collection of files that are produced by an
earlier
step
in the
pipeline (e.g. easily implement a connector to a
storage
system
that can
export itself to files)
- Implement a Kafka reader by composing a "list
partitions"
DoFn
with a
DoFn that simply polls a consumer and outputs new
records
in
a
while()
loop
- Implement a log tailer by composing a DoFn that
incrementally
returns
new
files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common"
algorithm
(matrix
squaring) with good work balancing
Here is the meaningful part of a hypothetical Kafka
reader
written
against
this API:
ProcessContinuation processElement(
ProcessContext context,
OffsetRangeTracker
tracker)
{
try (KafkaConsumer<String, String> consumer =
Kafka.subscribe(context.element().topic,
context.element().partition)) {
consumer.seek(tracker.start());
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100ms);
if (records == null) return done();
for (ConsumerRecord<String, String> record
:
records)
{
if (!tracker.tryClaim(record.offset())) {
return
resume().withFutureOutputWatermark(record.timestamp());
}
context.output(record);
}
}
}
}
The document describes in detail the motivations
behind
this
feature,
the
basic idea and API, open questions, and outlines an
incremental
delivery
plan.
The proposed API builds on the reflection-based new
DoFn
[new-do-fn]
and is
loosely related to "State and Timers for DoFn"
[beam-state].
Please take a look and comment!
Thanks.
[BEAM-65]
https://issues.apache.org/jira/browse/BEAM-65
[new-do-fn] https://s.apache.org/a-new-do-fn
[beam-state] https://s.apache.org/beam-state
--
Jean-Baptiste Onofré
jbono...@apache.org <javascript:;>
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Thanks,
Andrew
Subscribe to my book: Streaming Data <
http://manning.com/psaltis
<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <
http://twitter.com/intent/user?screen_name=itmdata
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com