Hi Eugene,

I really like the proposal, especially the part of embedding a non-Beam job
and export jobs prior to pipeline execution - up until now, such work would
have been managed by some 3rd party orchestrator that monitors the end of
the prepending job, and then executes the pipeline. Having this control at
the *SDF* sounds really great.

I wonder how do you see "high-level/direct" translations incorporated here
- the Spark runner for example, will prefer to directly translate TextIO
into it's own API for reading with a file pattern assuming that Spark's
implementation is optimal (for Spark).

Thanks,
Amit


On Mon, Aug 8, 2016 at 12:33 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Jip, thanks, that answers it.
>
> On Fri, 5 Aug 2016 at 19:51 Eugene Kirpichov <kirpic...@google.com.invalid
> >
> wrote:
>
> > Hi Aljoscha,
> >
> > AFAIK, the effect of .requiresDeduping() is that the runner inserts a
> > GBK/dedup transform on top of the read. This seems entirely compatible
> with
> > SDF, except it will be decoupled from the SDF itself: if an SDF produces
> > output that potentially contains duplicates, and there's no easy way to
> fix
> > it in the SDF itself, and you (developer of the connector) would like to
> > eliminate them, you can explicitly compose the SDF with a canned deduping
> > transform. Does this address your question?
> >
> > Thanks!
> >
> > On Fri, Aug 5, 2016 at 7:14 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > I really like the proposal, especially how it unifies at lot of things.
> > >
> > > One question: How would this work with sources that (right now) return
> > true
> > > from UnboundedSource.requiresDeduping(). As I understand it the code
> that
> > > executes such sources has to do bookkeeping to ensure that we don't get
> > > duplicate values. Would we add such a feature for the output of DoFns
> or
> > > would we work towards removing the deduping functionality from Beam and
> > > push it into the source implementations?
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> > >
> > > > By the way I like the use cases you are introducing: we discussed
> about
> > > > similar use cases with Dan.
> > > >
> > > > Just wonder about the existing IO.
> > > >
> > > > Regards
> > > > JB
> > > >
> > > >
> > > > On August 4, 2016 7:46:14 PM Eugene Kirpichov
> > > > <kirpic...@google.com.INVALID> wrote:
> > > >
> > > > > Hello Beam community,
> > > > >
> > > > > We (myself, Daniel Mills and Robert Bradshaw) would like to propose
> > > > > "Splittable DoFn" - a major generalization of DoFn, which allows
> > > > processing
> > > > > of a single element to be non-monolithic, i.e. checkpointable and
> > > > > parallelizable, as well as doing an unbounded amount of work per
> > > element.
> > > > >
> > > > > This allows effectively replacing the current
> Bounded/UnboundedSource
> > > > APIs
> > > > > with DoFn's that are much easier to code, more scalable and
> > composable
> > > > with
> > > > > the rest of the Beam programming model, and enables many use cases
> > that
> > > > > were previously difficult or impossible, as well as some
> non-obvious
> > > new
> > > > > use cases.
> > > > >
> > > > > This proposal has been mentioned before in JIRA [BEAM-65] and some
> > Beam
> > > > > meetings, and now the whole thing is written up in a document:
> > > > >
> > > > >         https://s.apache.org/splittable-do-fn
> > > > >
> > > > > Here are some things that become possible with Splittable DoFn:
> > > > > - Efficiently read a filepattern matching millions of files
> > > > > - Read a collection of files that are produced by an earlier step
> in
> > > the
> > > > > pipeline (e.g. easily implement a connector to a storage system
> that
> > > can
> > > > > export itself to files)
> > > > > - Implement a Kafka reader by composing a "list partitions" DoFn
> > with a
> > > > > DoFn that simply polls a consumer and outputs new records in a
> > while()
> > > > loop
> > > > > - Implement a log tailer by composing a DoFn that incrementally
> > returns
> > > > new
> > > > > files in a directory and a DoFn that tails a file
> > > > > - Implement a parallel "count friends in common" algorithm (matrix
> > > > > squaring) with good work balancing
> > > > >
> > > > > Here is the meaningful part of a hypothetical Kafka reader written
> > > > against
> > > > > this API:
> > > > >
> > > > >     ProcessContinuation processElement(
> > > > >             ProcessContext context, OffsetRangeTracker tracker) {
> > > > >       try (KafkaConsumer<String, String> consumer =
> > > > >                 Kafka.subscribe(context.element().topic,
> > > > >                                 context.element().partition)) {
> > > > >         consumer.seek(tracker.start());
> > > > >         while (true) {
> > > > >           ConsumerRecords<String, String> records =
> > > consumer.poll(100ms);
> > > > >           if (records == null) return done();
> > > > >           for (ConsumerRecord<String, String> record : records) {
> > > > >             if (!tracker.tryClaim(record.offset())) {
> > > > >               return
> > > > resume().withFutureOutputWatermark(record.timestamp());
> > > > >             }
> > > > >             context.output(record);
> > > > >           }
> > > > >         }
> > > > >       }
> > > > >     }
> > > > >
> > > > > The document describes in detail the motivations behind this
> feature,
> > > the
> > > > > basic idea and API, open questions, and outlines an incremental
> > > delivery
> > > > > plan.
> > > > >
> > > > > The proposed API builds on the reflection-based new DoFn
> [new-do-fn]
> > > and
> > > > is
> > > > > loosely related to "State and Timers for DoFn" [beam-state].
> > > > >
> > > > > Please take a look and comment!
> > > > >
> > > > > Thanks.
> > > > >
> > > > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > > > > [new-do-fn] https://s.apache.org/a-new-do-fn
> > > > > [beam-state] https://s.apache.org/beam-state
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to