I agree that the proper API for enabling the use case "do something after
the data has been written" is to return a PCollection of objects where each
object represents the result of writing some identifiable subset of the
data. Then one can apply a ParDo to this PCollection, in order to "do
something after this subset has been written".

The challenging part here is *identifying* the subset of the data that's
been written, in a way consistent with Beam's unified batch/streaming
model, where saying "all data has been written" is not an option because
more data can arrive.

The next choice is "a window of input has been written", but then again,
late data can arrive into a window as well.

Next choice after that is "a pane of input has been written", but per
https://s.apache.org/beam-sink-triggers the term "pane of input" is moot:
triggering and panes should be something private to the sink, and the same
input can trigger different sinks differently. The hypothetical different
accumulation modes make this trickier still. I'm not sure whether we intend
to also challenge the idea that windowing is inherent to the collection, or
whether it too should be specified on a transform that processes the
collection. I think for the sake of this discussion we can assume that it's
inherent, and assume the mental model that the elements in different
windows of a PCollection are processed independently - "as if" there were
multiple pipelines processing each window.

Overall, embracing the full picture, we end up with something like this:
- The input PCollection is a composition of windows.
- If the windowing strategy is non-merging (e.g. fixed or sliding windows),
the below applies to the entire contents of the PCollection. If it's
merging (e.g. session windows), then it applies per-key, and the input
should be (perhaps implicitly) keyed in a way that the sink understands -
for example, the grouping by destination in DynamicDestinations in file and
bigquery writes.
- Each window's contents is a "changelog" - stream of elements and
retractions.
- A "sink" processes each window of the collection, deciding how to handle
elements and retractions (and whether to support retractions at all) in a
sink-specific way, and deciding *when* to perform the side effects for a
portion of the changelog (a "pane") based on the sink's triggering strategy.
- If the side effect itself is parallelized, then there'll be multiple
results for the pane - e.g. one per bundle.
- Each (sink-chosen) pane produces a set of results, e.g. a list of
filenames that have been written, or simply a number of records that was
written, or a bogus void value etc. The result will implicitly include the
window of the input it's associated with. It will also implicitly include
pane information - index of the pane in this window, and whether this is
the first or last pane.
- The partitioning into bundles is an implementation detail and not very
useful, so before presenting the pane write results to the user, the sink
will probably want to Combine the bundle results so that there ends up
being 1 value for each pane that was written. Once again note that panes
may be associated with windows of the input as a whole, but if the input is
keyed (like with DynamicDestinations) they'll be associated with per-key
subsets of windows of the input.
- This combining requires an extra, well, combining operation, so it should
be optional.
- The user will end up getting either a PCollection<ResultT> or a
PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where
the elements of this collection will implicitly have window and pane
information, available via the implicit BoundedWindow and PaneInfo.
- Until "sink triggering" is implemented, we'll have to embrace the fact
that trigger strategy is set on the input. But in that case the user will
have to accept that the PaneInfo of ResultT's is not necessarily directly
related to panes of the input - the sink is allowed to do internal
aggregation as an implementation detail, which may modify the triggering
strategy. Basically the user will still get sink-assigned panes.
- In most cases, one may imagine that the user is interested in being
notified of "no more data associated with this window will be written", so
the user will ignore all ResultT's except those where the pane is marked
final. If a user is interested in being notified of intermediate write
results - they'll have to embrace the fact that they cannot identify the
precise subset of input associated with the intermediate result.

I think the really key points of the above are:
- Sinks should support windowed input. Sinks should write different windows
of input independently. If the sink can write multi-destination input, the
destination should function as a grouping key, and in that case merging
windowing should be allowed.
- Producing a PCollection of write results should be optional.
- When asked to produce results, sinks produce a PCollection of results
that may be keyed or unkeyed (per above), and are placed in the window of
the input that was written, and have a PaneInfo assigned by the sink, of
which probably the only part useful to the user is whether it's .isFinal().

Does this sound reasonable?

On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com> wrote:

> +1
>
> At the very least an empty PCollection<?> could be produced with no
> promises about its contents but the ability to be followed (e.g. as a
> side input), which is forward compatible with whatever actual metadata
> one may decide to produce in the future.
>
> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> wrote:
> > +dev@
> >
> > I am in complete agreement with Luke. Data dependencies are easy to
> > understand and a good way for an IO to communicate and establish causal
> > dependencies. Converting an IO from PDone to real output may spur further
> > useful thoughts based on the design decisions about what sort of output
> is
> > most useful.
> >
> > Kenn
> >
> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> I think all sinks actually do have valuable information to output which
> >> can be used after a write (file names, transaction/commit/row ids, table
> >> names, ...). In addition to this metadata, having a PCollection of all
> >> successful writes and all failed writes is useful for users so they can
> >> chain an action which depends on what was or wasn't successfully
> written.
> >> Users have requested adding retry/failure handling policies to sinks so
> that
> >> failed writes don't jam up the pipeline.
> >>
> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
> chet.aldr...@postmates.com>
> >> wrote:
> >>>
> >>> So I agree generally with the idea that returning a PCollection makes
> all
> >>> of this easier so that arbitrary additional functions can be added,
> what
> >>> exactly would write functions be returning in a PCollection that would
> make
> >>> sense? The whole idea is that we’ve written to an external source and
> now
> >>> the collection itself is no longer needed.
> >>>
> >>> Currently, that’s represented with a PDone, but currently that doesn’t
> >>> allow any work to occur after it. I see a couple possible ways of
> handling
> >>> this given this conversation, and am curious which solution sounds
> like the
> >>> best way to deal with the problem:
> >>>
> >>> 1. Have output transforms always return something specific (which would
> >>> be the same across transforms by convention), that is in the form of a
> >>> PCollection, so operations can occur after it.
> >>>
> >>> 2. Make either PDone or some new type that can act as a PCollection so
> we
> >>> can run applies afterward.
> >>>
> >>> 3. Make output transforms provide the facility for a callback function
> >>> which runs after the transform is complete.
> >>>
> >>> I went through these gymnastics recently when I was trying to build
> >>> something that would move indices after writing to Algolia, and the
> solution
> >>> was to co-opt code from the old Sink class that used to exist in Beam.
> The
> >>> problem is that particular method requires the output transform in
> question
> >>> to return a PCollection, even if it is trivial or doesn’t make sense to
> >>> return one. This seems like a bad solution, but unfortunately there
> isn’t a
> >>> notion of a transform that has no explicit output that needs to have
> >>> operations occur after it.
> >>>
> >>> The three potential solutions above address this issue, but I would
> like
> >>> to hear on which would be preferable (or perhaps a different proposal
> >>> altogether?). Perhaps we could also start up a ticket on this, since it
> >>> seems like a worthwhile feature addition. I would find it really
> useful, for
> >>> one.
> >>>
> >>> Chet
> >>>
> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote:
> >>>
> >>> Instead of a callback fn, its most useful if a PCollection is returned
> >>> containing the result of the sink so that any arbitrary additional
> functions
> >>> can be applied.
> >>>
> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> >>> wrote:
> >>>>
> >>>> Agree, I would prefer to do the callback in the IO more than in the
> >>>> main.
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
> >>>>>
> >>>>> I do something almost exactly like this, but with BigtableIO instead.
> >>>>> I have a pull request open here [1] (which reminds me I need to
> finish this
> >>>>> up...).  It would really be nice for most IOs to support something
> like
> >>>>> this.
> >>>>>
> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output
> from
> >>>>> the BigtableIO, and then feed that into your function which will run
> when
> >>>>> all writes finish.
> >>>>>
> >>>>> You probably want to avoid doing something in the main method because
> >>>>> there's no guarantee it'll actually run (maybe the driver will die,
> get
> >>>>> killed, machine will explode, etc).
> >>>>>
> >>>>> [1] https://github.com/apache/beam/pull/3997
> >>>>>
> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com
> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
> >>>>>
> >>>>>     Assuming you're in Java. You could just follow on in your Main
> >>>>> method.
> >>>>>     Checking the state of the Result.
> >>>>>
> >>>>>     Example:
> >>>>>     PipelineResult result = pipeline.run();
> >>>>>     try {
> >>>>>     result.waitUntilFinish();
> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
> >>>>>     //DO ES work
> >>>>>     }
> >>>>>     } catch(Exception e) {
> >>>>>     result.cancel();
> >>>>>     throw e;
> >>>>>     }
> >>>>>
> >>>>>     Otherwise you could also use Oozie to construct a work flow.
> >>>>>
> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
> >>>>> <j...@nanthrax.net
> >>>>>     <mailto:j...@nanthrax.net>> wrote:
> >>>>>
> >>>>>         Hi,
> >>>>>
> >>>>>         yes, we had a similar question some days ago.
> >>>>>
> >>>>>         We can imagine to have a user callback fn fired when the sink
> >>>>> batch is
> >>>>>         complete.
> >>>>>
> >>>>>         Let me think about that.
> >>>>>
> >>>>>         Regards
> >>>>>         JB
> >>>>>
> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
> >>>>>
> >>>>>             Hey JB,
> >>>>>
> >>>>>             Thanks for getting back so quickly.
> >>>>>             I suppose in that case I would need a way of monitoring
> >>>>> when the ES
> >>>>>             transform completes successfully before I can proceed
> with
> >>>>> doing the
> >>>>>             swap.
> >>>>>             The problem with this is that I can't think of a good way
> >>>>> to
> >>>>>             determine that termination state short of polling the new
> >>>>> index to
> >>>>>             check the document count compared to the size of input
> >>>>> PCollection.
> >>>>>             That, or maybe I'd need to use an external system like
> you
> >>>>> mentioned
> >>>>>             to poll on the state of the pipeline (I'm using Google
> >>>>> Dataflow, so
> >>>>>             maybe there's a way to do this with some API).
> >>>>>             But I would have thought that there would be an easy way
> of
> >>>>> simply
> >>>>>             saying "do not process this transform until this other
> >>>>> transform
> >>>>>             completes".
> >>>>>             Is there no established way of "signaling" between
> >>>>> pipelines when
> >>>>>             some pipeline completes, or have some way of declaring a
> >>>>> dependency
> >>>>>             of 1 transform on another transform?
> >>>>>
> >>>>>             Thanks again,
> >>>>>             Philip
> >>>>>
> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
> >>>>> <mailto:j...@nanthrax.net
> >>>>>
> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
> >>>>>
> >>>>>                  Hi Philip,
> >>>>>
> >>>>>                  You won't be able to do (3) in the same pipeline as
> >>>>> the
> >>>>>             Elasticsearch Sink
> >>>>>                  PTransform ends the pipeline with PDone.
> >>>>>
> >>>>>                  So, (3) has to be done in another pipeline (using a
> >>>>> DoFn) or in
> >>>>>             another
> >>>>>                  "system" (like Camel for instance). I would do a
> check
> >>>>> of the
> >>>>>             data in the
> >>>>>                  index and then trigger the swap there.
> >>>>>
> >>>>>                  Regards
> >>>>>                  JB
> >>>>>
> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
> >>>>>
> >>>>>                      Hi,
> >>>>>
> >>>>>                      I'm pretty new to Beam, and I've been trying to
> >>>>> use the
> >>>>>             ElasticSearchIO
> >>>>>                      sink to write docs into ES.
> >>>>>                      With this, I want to be able to
> >>>>>                      1. ingest and transform rows from DB (done)
> >>>>>                      2. write JSON docs/strings into a new ES index
> >>>>> (done)
> >>>>>                      3. After (2) is complete and all documents are
> >>>>> written into
> >>>>>             a new index,
> >>>>>                      trigger an atomic index swap under an alias to
> >>>>> replace the
> >>>>>             current
> >>>>>                      aliased index with the new index generated in
> step
> >>>>> 2. This
> >>>>>             is basically
> >>>>>                      a single POST request to the ES cluster.
> >>>>>
> >>>>>                      The problem I'm facing is that I don't seem to
> be
> >>>>> able to
> >>>>>             find a way to
> >>>>>                      have a way for (3) to happen after step (2) is
> >>>>> complete.
> >>>>>
> >>>>>                      The ElasticSearchIO.Write transform returns a
> >>>>> PDone, and
> >>>>>             I'm not sure
> >>>>>                      how to proceed from there because it doesn't
> seem
> >>>>> to let me
> >>>>>             do another
> >>>>>                      apply on it to "define" a dependency.
> >>>>>
> >>>>>
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >>
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >>>>>
> >>>>> <
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
> >>>
> >>>>>
> >>>>>                      Is there a recommended way to construct
> pipelines
> >>>>> workflows
> >>>>>             like this?
> >>>>>
> >>>>>                      Thanks in advance,
> >>>>>                      Philip
> >>>>>
> >>>>>
> >>>>>                  --     Jean-Baptiste Onofré
> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
> >>>>>             <mailto:jbono...@apache.org <mailto:jbono...@apache.org
> >>
> >>>>>             http://blog.nanthrax.net
> >>>>>                  Talend - http://www.talend.com
> >>>>>
> >>>>>
> >>>>>
> >>>>>         --         Jean-Baptiste Onofré
> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
> >>>>>         http://blog.nanthrax.net
> >>>>>         Talend - http://www.talend.com
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>     --     Nick Verbeck - NerdyNick
> >>>>>     ----------------------------------------------------
> >>>>>     NerdyNick.com <http://NerdyNick.com>
> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> jbono...@apache.org
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>
> >>>
> >>>
> >>
> >
>

Reply via email to