Hi Eugene,

thanks for the reminder.

Just to prepare some topics for the call, please find some points:

1. Using SDF, what would be the "packaging" of the IO ? It sounds to me that we can keep the IO packaging style (using with* setters for the IO configuration) and replace PTransform, Source, Reader, ... directly with SDF. Correct ?

2. What's your plan in term of release to include SDF ? We have several IOs in preparation and I wonder if it's worth to start to use the new SDF API or not.

3. What's the impact for the runner writers ? The runners will have to support SDF, that could be tricky depending of the execution engine. In the worst case where the runner can't fully support SDF, does it mean that most of our IOs will be useless ?

Just my dumb topics ;)

Thanks,
See you at 8am !

Regards
JB

On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
Hello everybody,

Just a reminder:

The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to join
the call go to
https://hangouts.google.com/hangouts/_/google.com/splittabledofn .
I intend to go over the proposed design and then have a free-form
discussion.

Please have a skim through the proposal doc: https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down version of the
doc to use as a guide when conducting the meeting,
https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
.

I will post notes from the meeting on this thread afterwards.

Thanks, looking forward.

On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin <dhalp...@google.com.invalid>
wrote:

This is pretty cool! I'll be there too. (unless the hangout gets too full
-- if so, I'll drop out in favor of others who aren't lucky enough to get
to talk to Eugene all the time.)

On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <psaltis.and...@gmail.com>
wrote:

+1 I'll join

On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
apban...@cisco.com

wrote:

+ 1, me2




On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com <javascript:;>>
wrote:

+1 as in I'll join ;-)

On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
<kirpic...@google.com.invalid

wrote:

Sounds good, thanks!
Then Friday Aug 19th it is, 8am-9am PST,
https://staging.talkgadget.google.com/hangouts/_/google.
com/splittabledofn

On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
j...@nanthrax.net
<javascript:;>>
wrote:

Hi

Unfortunately I will be in Ireland on August 15th. What about
Friday
19th ?

Regards
JB



On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:
Hi JB,

Sounds great, does the suggested time over videoconference work
for
you?

On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
j...@nanthrax.net <javascript:;>>
wrote:

Hi Eugene

May we talk together next week ? I like the proposal. I would
just
need
some details for my understanding.

Thanks
Regards
JB



On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:
Hi JB,

What are your thoughts on this?

I'm also thinking of having a virtual meeting to explain more
about
this
proposal if necessary, since I understand it is a lot to
digest.

How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
(link:




https://staging.talkgadget.google.com/hangouts/_/google.
com/splittabledofn
-
I confirmed that it can be joined without being logged into a
Google
account)

Who'd be interested in attending, and does this time/date work
for
people?

On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
<kirpic...@google.com <javascript:;>>
wrote:

Hi JB, thanks for reading and for your comments!

It sounds like you are concerned about continued support for
existing
IO's
people have developed, and about backward compatibility?

We do not need to remove the Source API, and all existing
Source-based
connectors will continue to work [though the document
proposes
at
some
point to make Read.from(Source) to translate to a wrapper
SDF
under
the
hood, to exercise the feature more and to make sure that it
is
strictly
more powerful - but this is an optional implementation
detail].

Perhaps the document phrases this too strongly - "replacing
the
Source
API": a better phrasing would be "introducing a new API so
powerful
and
easy-to-use that hopefully people will choose it over the
Source
API
all
the time, even though they don't have to" :) And we can
discuss
whether or
not to actually deprecate/remove the Source API at some
point
down
the
road, once it becomes clear whether this is the case or not.

To give more context: this proposal came out of discussions
within
the SDK
team over the past ~1.5 years, before the Beam project
existed,
on
how to
make major improvements to the Source API; perhaps it will
clarify
things
if I give a history of the ideas discussed:
- The first idea was to introduce a
Read.from(PCollection<Source>)
transform while keeping the Source API intact - this, given
appropriate
implementation, would solve most of the scalability and
composability
issues of IO's. Then most connectors would look like :
ParDo<A,
Source<B>>
+ Read.from().
- Then we figured that the Source class is an unnecessary
abstraction, as
it simply holds data. What if we only had a Reader<S, B>
class
where
S is
the source type and B the output type? Then connectors would
be
something
like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
- Then somebody remarked that some of the features of Source
are
useful to
ParDo's as well: e.g. ability to report progress when
processing a
very
heavy element, or ability to produce very large output in
parallel.
- The two previous bullets were already hinting that the
Read.using()
primitive might not be so special: it just takes S and
produces
B:
isn't
that what a ParDo does, plus some source magic, minus the
convenience
of
c.output() vs. the start/advance() state machine?
- At this point it became clear that we should explore
unifying
sources
and ParDo's, in particular: can we bring the magic of
sources
to
ParDo's
but without the limitations and coding inconveniences? And
this
is
how
SplittableDoFn was born: bringing source magic to a DoFn by
providing
a
RangeTracker.
- Once the idea of "splittable DoFn's" was born, it became
clear
that
it
is strictly more general than sources; at least, in the
respect
that
sources have to produce output, while DoFn's don't: an SDF
may
very
well
produce no output at all, and simply perform a side effect
in
a
parallel/resumable way.
- Then there were countless hours of discussions on unifying
the
bounded/unbounded cases, on the particulars of RangeTracker
APIs
reconciling parallelization and checkpointing, what the
relation
between
SDF and DF should be, etc. They culminated in the current
proposal.
The
proposal comes at a time when a couple of key ingredients
are
(almost)
ready: NewDoFn to make SDF look like a regular DoFn, and the
State/Timers
proposal to enable unbounded work per element.

To put it shortly:
- Yes, we will support existing Source connectors, and will
support
writing new ones, possibly forever. There is no interference
with
current
users of Source.
- The new API is an attempt to improve the Source API, taken
to
its
logical limit where it turns out that users' goals can be
accomplished
easier and more generically entirely within ParDo's.

Let me know what you think, and thanks again!

On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
<j...@nanthrax.net <javascript:;>>
wrote:

Hi Eugene,

Just a question: why is it in DoFn and note an improvement
of
Source
?

If I understand correctly, it means that we will have to
refactore
all
existing IO: basically, what you propose is to remove all
Source
to
replace with NewDoFn.

I'm concern with this approach, especially in term of
timing:
clearly,
the IO is the area where we have to move forward in Beam as
it
will
allow new users to start in their projects.
So, we started to bring new IOs: Kafka, JMS, Cassandra,
MongoDB,
JDBC,
... and some people started to learn the IO API
(Bounded/Unbouded
source, etc).

I think it would make more sense to enhance the IO API
(Source)
instead
of introducing a NewDoFn.

What are your thoughts for IO writer like me ? ;)

Regards
JB

On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw) would like
to
propose
"Splittable DoFn" - a major generalization of DoFn, which
allows
processing
of a single element to be non-monolithic, i.e.
checkpointable
and
parallelizable, as well as doing an unbounded amount of
work
per
element.

This allows effectively replacing the current
Bounded/UnboundedSource
APIs
with DoFn's that are much easier to code, more scalable
and
composable
with
the rest of the Beam programming model, and enables many
use
cases
that
were previously difficult or impossible, as well as some
non-obvious new
use cases.

This proposal has been mentioned before in JIRA [BEAM-65]
and
some
Beam
meetings, and now the whole thing is written up in a
document:

        https://s.apache.org/splittable-do-fn

Here are some things that become possible with Splittable
DoFn:
- Efficiently read a filepattern matching millions of
files
- Read a collection of files that are produced by an
earlier
step
in the
pipeline (e.g. easily implement a connector to a storage
system
that can
export itself to files)
- Implement a Kafka reader by composing a "list
partitions"
DoFn
with a
DoFn that simply polls a consumer and outputs new records
in
a
while()
loop
- Implement a log tailer by composing a DoFn that
incrementally
returns
new
files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common"
algorithm
(matrix
squaring) with good work balancing

Here is the meaningful part of a hypothetical Kafka
reader
written
against
this API:

    ProcessContinuation processElement(
            ProcessContext context, OffsetRangeTracker
tracker)
{
      try (KafkaConsumer<String, String> consumer =
                Kafka.subscribe(context.element().topic,

 context.element().partition)) {
        consumer.seek(tracker.start());
        while (true) {
          ConsumerRecords<String, String> records =
consumer.poll(100ms);
          if (records == null) return done();
          for (ConsumerRecord<String, String> record :
records)
{
            if (!tracker.tryClaim(record.offset())) {
              return
resume().withFutureOutputWatermark(record.timestamp());
            }
            context.output(record);
          }
        }
      }
    }

The document describes in detail the motivations behind
this
feature,
the
basic idea and API, open questions, and outlines an
incremental
delivery
plan.

The proposed API builds on the reflection-based new DoFn
[new-do-fn]
and is
loosely related to "State and Timers for DoFn"
[beam-state].

Please take a look and comment!

Thanks.

[BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
[new-do-fn] https://s.apache.org/a-new-do-fn
[beam-state] https://s.apache.org/beam-state


--
Jean-Baptiste Onofré
jbono...@apache.org <javascript:;>
http://blog.nanthrax.net
Talend - http://www.talend.com








--
Thanks,
Andrew

Subscribe to my book: Streaming Data <http://manning.com/psaltis>
<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to