This would be absolutely great! It seems somewhat similar to the changes
that were made to the BigQuery sink to support WriteResult (
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
).

I find it helpful to think about the different things that may come after a
sink. For instance:

1. It might be helpful to have a collection of failing input elements. The
type of failed elements is pretty straightforward -- just the input
elements. This allows handling such failures by directing them elsewhere or
performing additional processing.

2. For a sink that produces a series of files, it might be useful to have a
collection of the file names that have been completely written. This allows
performing additional handling on these completed segments.

3. For a sink that updates some destination, it would be reasonable to have
a collection that provides (periodically) output indicating how complete
the information written to that destination is. For instance, this might be
something like "<this bigquery table> has all of the elements up to <input
watermark>" complete. This allows tracking how much information has been
completely written out.

I think those concepts map to the more detailed description Eugene
provided, but I find it helpful to focus on what information comes out of
the sink and how it might be used.

Were there any use cases the above miss? Any functionality that has been
described that doesn't map to these use cases?

-- Ben

On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> It makes sense to consider how this maps onto existing kinds of sinks.
>
> E.g.:
> - Something that just makes an RPC per record, e.g. MqttIO.write(): that
> will emit 1 result per bundle (either a bogus value or number of records
> written) that will be Combine'd into 1 result per pane of input. A user can
> sequence against this and be notified when some intermediate amount of data
> has been written for a window, or (via .isFinal()) when all of it has been
> written.
> - Something that e.g. initiates an import job, such as BigQueryIO.write(),
> or an ElasticsearchIO write with a follow-up atomic index swap: should emit
> 1 result per import job, e.g. containing information about the job (e.g.
> its id and statistics). Role of panes is the same.
> - Something like above but that supports dynamic destinations: like in
> WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> where
> ResultT may be something like a list of files that were written for this
> pane of this destination.
>
> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> I agree that the proper API for enabling the use case "do something after
>> the data has been written" is to return a PCollection of objects where each
>> object represents the result of writing some identifiable subset of the
>> data. Then one can apply a ParDo to this PCollection, in order to "do
>> something after this subset has been written".
>>
>> The challenging part here is *identifying* the subset of the data that's
>> been written, in a way consistent with Beam's unified batch/streaming
>> model, where saying "all data has been written" is not an option because
>> more data can arrive.
>>
>> The next choice is "a window of input has been written", but then again,
>> late data can arrive into a window as well.
>>
>> Next choice after that is "a pane of input has been written", but per
>> https://s.apache.org/beam-sink-triggers the term "pane of input" is
>> moot: triggering and panes should be something private to the sink, and the
>> same input can trigger different sinks differently. The hypothetical
>> different accumulation modes make this trickier still. I'm not sure whether
>> we intend to also challenge the idea that windowing is inherent to the
>> collection, or whether it too should be specified on a transform that
>> processes the collection. I think for the sake of this discussion we can
>> assume that it's inherent, and assume the mental model that the elements in
>> different windows of a PCollection are processed independently - "as if"
>> there were multiple pipelines processing each window.
>>
>> Overall, embracing the full picture, we end up with something like this:
>> - The input PCollection is a composition of windows.
>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>> windows), the below applies to the entire contents of the PCollection. If
>> it's merging (e.g. session windows), then it applies per-key, and the input
>> should be (perhaps implicitly) keyed in a way that the sink understands -
>> for example, the grouping by destination in DynamicDestinations in file and
>> bigquery writes.
>> - Each window's contents is a "changelog" - stream of elements and
>> retractions.
>> - A "sink" processes each window of the collection, deciding how to
>> handle elements and retractions (and whether to support retractions at all)
>> in a sink-specific way, and deciding *when* to perform the side effects for
>> a portion of the changelog (a "pane") based on the sink's triggering
>> strategy.
>> - If the side effect itself is parallelized, then there'll be multiple
>> results for the pane - e.g. one per bundle.
>> - Each (sink-chosen) pane produces a set of results, e.g. a list of
>> filenames that have been written, or simply a number of records that was
>> written, or a bogus void value etc. The result will implicitly include the
>> window of the input it's associated with. It will also implicitly include
>> pane information - index of the pane in this window, and whether this is
>> the first or last pane.
>> - The partitioning into bundles is an implementation detail and not very
>> useful, so before presenting the pane write results to the user, the sink
>> will probably want to Combine the bundle results so that there ends up
>> being 1 value for each pane that was written. Once again note that panes
>> may be associated with windows of the input as a whole, but if the input is
>> keyed (like with DynamicDestinations) they'll be associated with per-key
>> subsets of windows of the input.
>> - This combining requires an extra, well, combining operation, so it
>> should be optional.
>> - The user will end up getting either a PCollection<ResultT> or a
>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where
>> the elements of this collection will implicitly have window and pane
>> information, available via the implicit BoundedWindow and PaneInfo.
>> - Until "sink triggering" is implemented, we'll have to embrace the fact
>> that trigger strategy is set on the input. But in that case the user will
>> have to accept that the PaneInfo of ResultT's is not necessarily directly
>> related to panes of the input - the sink is allowed to do internal
>> aggregation as an implementation detail, which may modify the triggering
>> strategy. Basically the user will still get sink-assigned panes.
>> - In most cases, one may imagine that the user is interested in being
>> notified of "no more data associated with this window will be written", so
>> the user will ignore all ResultT's except those where the pane is marked
>> final. If a user is interested in being notified of intermediate write
>> results - they'll have to embrace the fact that they cannot identify the
>> precise subset of input associated with the intermediate result.
>>
>> I think the really key points of the above are:
>> - Sinks should support windowed input. Sinks should write different
>> windows of input independently. If the sink can write multi-destination
>> input, the destination should function as a grouping key, and in that case
>> merging windowing should be allowed.
>> - Producing a PCollection of write results should be optional.
>> - When asked to produce results, sinks produce a PCollection of results
>> that may be keyed or unkeyed (per above), and are placed in the window of
>> the input that was written, and have a PaneInfo assigned by the sink, of
>> which probably the only part useful to the user is whether it's .isFinal().
>>
>> Does this sound reasonable?
>>
>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> +1
>>>
>>> At the very least an empty PCollection<?> could be produced with no
>>> promises about its contents but the ability to be followed (e.g. as a
>>> side input), which is forward compatible with whatever actual metadata
>>> one may decide to produce in the future.
>>>
>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> wrote:
>>> > +dev@
>>> >
>>> > I am in complete agreement with Luke. Data dependencies are easy to
>>> > understand and a good way for an IO to communicate and establish causal
>>> > dependencies. Converting an IO from PDone to real output may spur
>>> further
>>> > useful thoughts based on the design decisions about what sort of
>>> output is
>>> > most useful.
>>> >
>>> > Kenn
>>> >
>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> wrote:
>>> >>
>>> >> I think all sinks actually do have valuable information to output
>>> which
>>> >> can be used after a write (file names, transaction/commit/row ids,
>>> table
>>> >> names, ...). In addition to this metadata, having a PCollection of all
>>> >> successful writes and all failed writes is useful for users so they
>>> can
>>> >> chain an action which depends on what was or wasn't successfully
>>> written.
>>> >> Users have requested adding retry/failure handling policies to sinks
>>> so that
>>> >> failed writes don't jam up the pipeline.
>>> >>
>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>> chet.aldr...@postmates.com>
>>> >> wrote:
>>> >>>
>>> >>> So I agree generally with the idea that returning a PCollection
>>> makes all
>>> >>> of this easier so that arbitrary additional functions can be added,
>>> what
>>> >>> exactly would write functions be returning in a PCollection that
>>> would make
>>> >>> sense? The whole idea is that we’ve written to an external source
>>> and now
>>> >>> the collection itself is no longer needed.
>>> >>>
>>> >>> Currently, that’s represented with a PDone, but currently that
>>> doesn’t
>>> >>> allow any work to occur after it. I see a couple possible ways of
>>> handling
>>> >>> this given this conversation, and am curious which solution sounds
>>> like the
>>> >>> best way to deal with the problem:
>>> >>>
>>> >>> 1. Have output transforms always return something specific (which
>>> would
>>> >>> be the same across transforms by convention), that is in the form of
>>> a
>>> >>> PCollection, so operations can occur after it.
>>> >>>
>>> >>> 2. Make either PDone or some new type that can act as a PCollection
>>> so we
>>> >>> can run applies afterward.
>>> >>>
>>> >>> 3. Make output transforms provide the facility for a callback
>>> function
>>> >>> which runs after the transform is complete.
>>> >>>
>>> >>> I went through these gymnastics recently when I was trying to build
>>> >>> something that would move indices after writing to Algolia, and the
>>> solution
>>> >>> was to co-opt code from the old Sink class that used to exist in
>>> Beam. The
>>> >>> problem is that particular method requires the output transform in
>>> question
>>> >>> to return a PCollection, even if it is trivial or doesn’t make sense
>>> to
>>> >>> return one. This seems like a bad solution, but unfortunately there
>>> isn’t a
>>> >>> notion of a transform that has no explicit output that needs to have
>>> >>> operations occur after it.
>>> >>>
>>> >>> The three potential solutions above address this issue, but I would
>>> like
>>> >>> to hear on which would be preferable (or perhaps a different proposal
>>> >>> altogether?). Perhaps we could also start up a ticket on this, since
>>> it
>>> >>> seems like a worthwhile feature addition. I would find it really
>>> useful, for
>>> >>> one.
>>> >>>
>>> >>> Chet
>>> >>>
>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote:
>>> >>>
>>> >>> Instead of a callback fn, its most useful if a PCollection is
>>> returned
>>> >>> containing the result of the sink so that any arbitrary additional
>>> functions
>>> >>> can be applied.
>>> >>>
>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>> j...@nanthrax.net>
>>> >>> wrote:
>>> >>>>
>>> >>>> Agree, I would prefer to do the callback in the IO more than in the
>>> >>>> main.
>>> >>>>
>>> >>>> Regards
>>> >>>> JB
>>> >>>>
>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>> >>>>>
>>> >>>>> I do something almost exactly like this, but with BigtableIO
>>> instead.
>>> >>>>> I have a pull request open here [1] (which reminds me I need to
>>> finish this
>>> >>>>> up...).  It would really be nice for most IOs to support something
>>> like
>>> >>>>> this.
>>> >>>>>
>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output
>>> from
>>> >>>>> the BigtableIO, and then feed that into your function which will
>>> run when
>>> >>>>> all writes finish.
>>> >>>>>
>>> >>>>> You probably want to avoid doing something in the main method
>>> because
>>> >>>>> there's no guarantee it'll actually run (maybe the driver will
>>> die, get
>>> >>>>> killed, machine will explode, etc).
>>> >>>>>
>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>> >>>>>
>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com
>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>> >>>>>
>>> >>>>>     Assuming you're in Java. You could just follow on in your Main
>>> >>>>> method.
>>> >>>>>     Checking the state of the Result.
>>> >>>>>
>>> >>>>>     Example:
>>> >>>>>     PipelineResult result = pipeline.run();
>>> >>>>>     try {
>>> >>>>>     result.waitUntilFinish();
>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>> >>>>>     //DO ES work
>>> >>>>>     }
>>> >>>>>     } catch(Exception e) {
>>> >>>>>     result.cancel();
>>> >>>>>     throw e;
>>> >>>>>     }
>>> >>>>>
>>> >>>>>     Otherwise you could also use Oozie to construct a work flow.
>>> >>>>>
>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>> >>>>> <j...@nanthrax.net
>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>> >>>>>
>>> >>>>>         Hi,
>>> >>>>>
>>> >>>>>         yes, we had a similar question some days ago.
>>> >>>>>
>>> >>>>>         We can imagine to have a user callback fn fired when the
>>> sink
>>> >>>>> batch is
>>> >>>>>         complete.
>>> >>>>>
>>> >>>>>         Let me think about that.
>>> >>>>>
>>> >>>>>         Regards
>>> >>>>>         JB
>>> >>>>>
>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>> >>>>>
>>> >>>>>             Hey JB,
>>> >>>>>
>>> >>>>>             Thanks for getting back so quickly.
>>> >>>>>             I suppose in that case I would need a way of monitoring
>>> >>>>> when the ES
>>> >>>>>             transform completes successfully before I can proceed
>>> with
>>> >>>>> doing the
>>> >>>>>             swap.
>>> >>>>>             The problem with this is that I can't think of a good
>>> way
>>> >>>>> to
>>> >>>>>             determine that termination state short of polling the
>>> new
>>> >>>>> index to
>>> >>>>>             check the document count compared to the size of input
>>> >>>>> PCollection.
>>> >>>>>             That, or maybe I'd need to use an external system like
>>> you
>>> >>>>> mentioned
>>> >>>>>             to poll on the state of the pipeline (I'm using Google
>>> >>>>> Dataflow, so
>>> >>>>>             maybe there's a way to do this with some API).
>>> >>>>>             But I would have thought that there would be an easy
>>> way of
>>> >>>>> simply
>>> >>>>>             saying "do not process this transform until this other
>>> >>>>> transform
>>> >>>>>             completes".
>>> >>>>>             Is there no established way of "signaling" between
>>> >>>>> pipelines when
>>> >>>>>             some pipeline completes, or have some way of declaring
>>> a
>>> >>>>> dependency
>>> >>>>>             of 1 transform on another transform?
>>> >>>>>
>>> >>>>>             Thanks again,
>>> >>>>>             Philip
>>> >>>>>
>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>> >>>>> <mailto:j...@nanthrax.net
>>> >>>>>
>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>> >>>>>
>>> >>>>>                  Hi Philip,
>>> >>>>>
>>> >>>>>                  You won't be able to do (3) in the same pipeline
>>> as
>>> >>>>> the
>>> >>>>>             Elasticsearch Sink
>>> >>>>>                  PTransform ends the pipeline with PDone.
>>> >>>>>
>>> >>>>>                  So, (3) has to be done in another pipeline (using
>>> a
>>> >>>>> DoFn) or in
>>> >>>>>             another
>>> >>>>>                  "system" (like Camel for instance). I would do a
>>> check
>>> >>>>> of the
>>> >>>>>             data in the
>>> >>>>>                  index and then trigger the swap there.
>>> >>>>>
>>> >>>>>                  Regards
>>> >>>>>                  JB
>>> >>>>>
>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>> >>>>>
>>> >>>>>                      Hi,
>>> >>>>>
>>> >>>>>                      I'm pretty new to Beam, and I've been trying
>>> to
>>> >>>>> use the
>>> >>>>>             ElasticSearchIO
>>> >>>>>                      sink to write docs into ES.
>>> >>>>>                      With this, I want to be able to
>>> >>>>>                      1. ingest and transform rows from DB (done)
>>> >>>>>                      2. write JSON docs/strings into a new ES index
>>> >>>>> (done)
>>> >>>>>                      3. After (2) is complete and all documents are
>>> >>>>> written into
>>> >>>>>             a new index,
>>> >>>>>                      trigger an atomic index swap under an alias to
>>> >>>>> replace the
>>> >>>>>             current
>>> >>>>>                      aliased index with the new index generated in
>>> step
>>> >>>>> 2. This
>>> >>>>>             is basically
>>> >>>>>                      a single POST request to the ES cluster.
>>> >>>>>
>>> >>>>>                      The problem I'm facing is that I don't seem
>>> to be
>>> >>>>> able to
>>> >>>>>             find a way to
>>> >>>>>                      have a way for (3) to happen after step (2) is
>>> >>>>> complete.
>>> >>>>>
>>> >>>>>                      The ElasticSearchIO.Write transform returns a
>>> >>>>> PDone, and
>>> >>>>>             I'm not sure
>>> >>>>>                      how to proceed from there because it doesn't
>>> seem
>>> >>>>> to let me
>>> >>>>>             do another
>>> >>>>>                      apply on it to "define" a dependency.
>>> >>>>>
>>> >>>>>
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >>
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >>>>>
>>> >>>>> <
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >>>
>>> >>>>>
>>> >>>>>                      Is there a recommended way to construct
>>> pipelines
>>> >>>>> workflows
>>> >>>>>             like this?
>>> >>>>>
>>> >>>>>                      Thanks in advance,
>>> >>>>>                      Philip
>>> >>>>>
>>> >>>>>
>>> >>>>>                  --     Jean-Baptiste Onofré
>>> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>> jbono...@apache.org>>
>>> >>>>>             http://blog.nanthrax.net
>>> >>>>>                  Talend - http://www.talend.com
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>         --         Jean-Baptiste Onofré
>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>> >>>>>         http://blog.nanthrax.net
>>> >>>>>         Talend - http://www.talend.com
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>     --     Nick Verbeck - NerdyNick
>>> >>>>>     ----------------------------------------------------
>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>
>>> >>>> --
>>> >>>> Jean-Baptiste Onofré
>>> >>>> jbono...@apache.org
>>> >>>> http://blog.nanthrax.net
>>> >>>> Talend - http://www.talend.com
>>> >>>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>

Reply via email to