Hi JB, thanks for reading and for your comments!

It sounds like you are concerned about continued support for existing IO's
people have developed, and about backward compatibility?

We do not need to remove the Source API, and all existing Source-based
connectors will continue to work [though the document proposes at some
point to make Read.from(Source) to translate to a wrapper SDF under the
hood, to exercise the feature more and to make sure that it is strictly
more powerful - but this is an optional implementation detail].

Perhaps the document phrases this too strongly - "replacing the Source
API": a better phrasing would be "introducing a new API so powerful and
easy-to-use that hopefully people will choose it over the Source API all
the time, even though they don't have to" :) And we can discuss whether or
not to actually deprecate/remove the Source API at some point down the
road, once it becomes clear whether this is the case or not.

To give more context: this proposal came out of discussions within the SDK
team over the past ~1.5 years, before the Beam project existed, on how to
make major improvements to the Source API; perhaps it will clarify things
if I give a history of the ideas discussed:
- The first idea was to introduce a Read.from(PCollection<Source>)
transform while keeping the Source API intact - this, given appropriate
implementation, would solve most of the scalability and composability
issues of IO's. Then most connectors would look like : ParDo<A, Source<B>>
+ Read.from().
- Then we figured that the Source class is an unnecessary abstraction, as
it simply holds data. What if we only had a Reader<S, B> class where S is
the source type and B the output type? Then connectors would be something
like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
- Then somebody remarked that some of the features of Source are useful to
ParDo's as well: e.g. ability to report progress when processing a very
heavy element, or ability to produce very large output in parallel.
- The two previous bullets were already hinting that the Read.using()
primitive might not be so special: it just takes S and produces B: isn't
that what a ParDo does, plus some source magic, minus the convenience of
c.output() vs. the start/advance() state machine?
- At this point it became clear that we should explore unifying sources and
ParDo's, in particular: can we bring the magic of sources to ParDo's but
without the limitations and coding inconveniences? And this is how
SplittableDoFn was born: bringing source magic to a DoFn by providing a
RangeTracker.
- Once the idea of "splittable DoFn's" was born, it became clear that it is
strictly more general than sources; at least, in the respect that sources
have to produce output, while DoFn's don't: an SDF may very well produce no
output at all, and simply perform a side effect in a parallel/resumable way.
- Then there were countless hours of discussions on unifying the
bounded/unbounded cases, on the particulars of RangeTracker APIs
reconciling parallelization and checkpointing, what the relation between
SDF and DF should be, etc. They culminated in the current proposal. The
proposal comes at a time when a couple of key ingredients are (almost)
ready: NewDoFn to make SDF look like a regular DoFn, and the State/Timers
proposal to enable unbounded work per element.

To put it shortly:
- Yes, we will support existing Source connectors, and will support writing
new ones, possibly forever. There is no interference with current users of
Source.
- The new API is an attempt to improve the Source API, taken to its logical
limit where it turns out that users' goals can be accomplished easier and
more generically entirely within ParDo's.

Let me know what you think, and thanks again!

On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

> Hi Eugene,
>
> Just a question: why is it in DoFn and note an improvement of Source ?
>
> If I understand correctly, it means that we will have to refactore all
> existing IO: basically, what you propose is to remove all Source to
> replace with NewDoFn.
>
> I'm concern with this approach, especially in term of timing: clearly,
> the IO is the area where we have to move forward in Beam as it will
> allow new users to start in their projects.
> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB, JDBC,
> ... and some people started to learn the IO API (Bounded/Unbouded
> source, etc).
>
> I think it would make more sense to enhance the IO API (Source) instead
> of introducing a NewDoFn.
>
> What are your thoughts for IO writer like me ? ;)
>
> Regards
> JB
>
> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > Hello Beam community,
> >
> > We (myself, Daniel Mills and Robert Bradshaw) would like to propose
> > "Splittable DoFn" - a major generalization of DoFn, which allows
> processing
> > of a single element to be non-monolithic, i.e. checkpointable and
> > parallelizable, as well as doing an unbounded amount of work per element.
> >
> > This allows effectively replacing the current Bounded/UnboundedSource
> APIs
> > with DoFn's that are much easier to code, more scalable and composable
> with
> > the rest of the Beam programming model, and enables many use cases that
> > were previously difficult or impossible, as well as some non-obvious new
> > use cases.
> >
> > This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
> > meetings, and now the whole thing is written up in a document:
> >
> >         https://s.apache.org/splittable-do-fn
> >
> > Here are some things that become possible with Splittable DoFn:
> > - Efficiently read a filepattern matching millions of files
> > - Read a collection of files that are produced by an earlier step in the
> > pipeline (e.g. easily implement a connector to a storage system that can
> > export itself to files)
> > - Implement a Kafka reader by composing a "list partitions" DoFn with a
> > DoFn that simply polls a consumer and outputs new records in a while()
> loop
> > - Implement a log tailer by composing a DoFn that incrementally returns
> new
> > files in a directory and a DoFn that tails a file
> > - Implement a parallel "count friends in common" algorithm (matrix
> > squaring) with good work balancing
> >
> > Here is the meaningful part of a hypothetical Kafka reader written
> against
> > this API:
> >
> >     ProcessContinuation processElement(
> >             ProcessContext context, OffsetRangeTracker tracker) {
> >       try (KafkaConsumer<String, String> consumer =
> >                 Kafka.subscribe(context.element().topic,
> >                                 context.element().partition)) {
> >         consumer.seek(tracker.start());
> >         while (true) {
> >           ConsumerRecords<String, String> records = consumer.poll(100ms);
> >           if (records == null) return done();
> >           for (ConsumerRecord<String, String> record : records) {
> >             if (!tracker.tryClaim(record.offset())) {
> >               return
> resume().withFutureOutputWatermark(record.timestamp());
> >             }
> >             context.output(record);
> >           }
> >         }
> >       }
> >     }
> >
> > The document describes in detail the motivations behind this feature, the
> > basic idea and API, open questions, and outlines an incremental delivery
> > plan.
> >
> > The proposed API builds on the reflection-based new DoFn [new-do-fn] and
> is
> > loosely related to "State and Timers for DoFn" [beam-state].
> >
> > Please take a look and comment!
> >
> > Thanks.
> >
> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > [new-do-fn] https://s.apache.org/a-new-do-fn
> > [beam-state] https://s.apache.org/beam-state
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to