This sounds great!

On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org> wrote:

> This would be absolutely great! It seems somewhat similar to the changes
> that were made to the BigQuery sink to support WriteResult (
> https://github.com/apache/beam/blob/master/sdks/java/io/
> google-cloud-platform/src/main/java/org/apache/beam/sdk/
> io/gcp/bigquery/WriteResult.java).
>
> I find it helpful to think about the different things that may come after
> a sink. For instance:
>
> 1. It might be helpful to have a collection of failing input elements. The
> type of failed elements is pretty straightforward -- just the input
> elements. This allows handling such failures by directing them elsewhere or
> performing additional processing.
>

BigQueryIO already does this as you point out.

>
> 2. For a sink that produces a series of files, it might be useful to have
> a collection of the file names that have been completely written. This
> allows performing additional handling on these completed segments.
>

In fact we already do this for FileBasedSinks.   See
https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java

>
> 3. For a sink that updates some destination, it would be reasonable to
> have a collection that provides (periodically) output indicating how
> complete the information written to that destination is. For instance, this
> might be something like "<this bigquery table> has all of the elements up
> to <input watermark>" complete. This allows tracking how much information
> has been completely written out.
>

Interesting. Maybe tough to do since sinks often don't have that knowledge.


>
> I think those concepts map to the more detailed description Eugene
> provided, but I find it helpful to focus on what information comes out of
> the sink and how it might be used.
>
> Were there any use cases the above miss? Any functionality that has been
> described that doesn't map to these use cases?
>
> -- Ben
>
> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> It makes sense to consider how this maps onto existing kinds of sinks.
>>
>> E.g.:
>> - Something that just makes an RPC per record, e.g. MqttIO.write(): that
>> will emit 1 result per bundle (either a bogus value or number of records
>> written) that will be Combine'd into 1 result per pane of input. A user can
>> sequence against this and be notified when some intermediate amount of data
>> has been written for a window, or (via .isFinal()) when all of it has been
>> written.
>> - Something that e.g. initiates an import job, such as
>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic
>> index swap: should emit 1 result per import job, e.g. containing
>> information about the job (e.g. its id and statistics). Role of panes is
>> the same.
>> - Something like above but that supports dynamic destinations: like in
>> WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> where
>> ResultT may be something like a list of files that were written for this
>> pane of this destination.
>>
>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> I agree that the proper API for enabling the use case "do something
>>> after the data has been written" is to return a PCollection of objects
>>> where each object represents the result of writing some identifiable subset
>>> of the data. Then one can apply a ParDo to this PCollection, in order to
>>> "do something after this subset has been written".
>>>
>>> The challenging part here is *identifying* the subset of the data that's
>>> been written, in a way consistent with Beam's unified batch/streaming
>>> model, where saying "all data has been written" is not an option because
>>> more data can arrive.
>>>
>>> The next choice is "a window of input has been written", but then again,
>>> late data can arrive into a window as well.
>>>
>>> Next choice after that is "a pane of input has been written", but per
>>> https://s.apache.org/beam-sink-triggers the term "pane of input" is
>>> moot: triggering and panes should be something private to the sink, and the
>>> same input can trigger different sinks differently. The hypothetical
>>> different accumulation modes make this trickier still. I'm not sure whether
>>> we intend to also challenge the idea that windowing is inherent to the
>>> collection, or whether it too should be specified on a transform that
>>> processes the collection. I think for the sake of this discussion we can
>>> assume that it's inherent, and assume the mental model that the elements in
>>> different windows of a PCollection are processed independently - "as if"
>>> there were multiple pipelines processing each window.
>>>
>>> Overall, embracing the full picture, we end up with something like this:
>>> - The input PCollection is a composition of windows.
>>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>>> windows), the below applies to the entire contents of the PCollection. If
>>> it's merging (e.g. session windows), then it applies per-key, and the input
>>> should be (perhaps implicitly) keyed in a way that the sink understands -
>>> for example, the grouping by destination in DynamicDestinations in file and
>>> bigquery writes.
>>> - Each window's contents is a "changelog" - stream of elements and
>>> retractions.
>>> - A "sink" processes each window of the collection, deciding how to
>>> handle elements and retractions (and whether to support retractions at all)
>>> in a sink-specific way, and deciding *when* to perform the side effects for
>>> a portion of the changelog (a "pane") based on the sink's triggering
>>> strategy.
>>> - If the side effect itself is parallelized, then there'll be multiple
>>> results for the pane - e.g. one per bundle.
>>> - Each (sink-chosen) pane produces a set of results, e.g. a list of
>>> filenames that have been written, or simply a number of records that was
>>> written, or a bogus void value etc. The result will implicitly include the
>>> window of the input it's associated with. It will also implicitly include
>>> pane information - index of the pane in this window, and whether this is
>>> the first or last pane.
>>> - The partitioning into bundles is an implementation detail and not very
>>> useful, so before presenting the pane write results to the user, the sink
>>> will probably want to Combine the bundle results so that there ends up
>>> being 1 value for each pane that was written. Once again note that panes
>>> may be associated with windows of the input as a whole, but if the input is
>>> keyed (like with DynamicDestinations) they'll be associated with per-key
>>> subsets of windows of the input.
>>> - This combining requires an extra, well, combining operation, so it
>>> should be optional.
>>> - The user will end up getting either a PCollection<ResultT> or a
>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where
>>> the elements of this collection will implicitly have window and pane
>>> information, available via the implicit BoundedWindow and PaneInfo.
>>> - Until "sink triggering" is implemented, we'll have to embrace the fact
>>> that trigger strategy is set on the input. But in that case the user will
>>> have to accept that the PaneInfo of ResultT's is not necessarily directly
>>> related to panes of the input - the sink is allowed to do internal
>>> aggregation as an implementation detail, which may modify the triggering
>>> strategy. Basically the user will still get sink-assigned panes.
>>> - In most cases, one may imagine that the user is interested in being
>>> notified of "no more data associated with this window will be written", so
>>> the user will ignore all ResultT's except those where the pane is marked
>>> final. If a user is interested in being notified of intermediate write
>>> results - they'll have to embrace the fact that they cannot identify the
>>> precise subset of input associated with the intermediate result.
>>>
>>> I think the really key points of the above are:
>>> - Sinks should support windowed input. Sinks should write different
>>> windows of input independently. If the sink can write multi-destination
>>> input, the destination should function as a grouping key, and in that case
>>> merging windowing should be allowed.
>>> - Producing a PCollection of write results should be optional.
>>> - When asked to produce results, sinks produce a PCollection of results
>>> that may be keyed or unkeyed (per above), and are placed in the window of
>>> the input that was written, and have a PaneInfo assigned by the sink, of
>>> which probably the only part useful to the user is whether it's .isFinal().
>>>
>>> Does this sound reasonable?
>>>
>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> +1
>>>>
>>>> At the very least an empty PCollection<?> could be produced with no
>>>> promises about its contents but the ability to be followed (e.g. as a
>>>> side input), which is forward compatible with whatever actual metadata
>>>> one may decide to produce in the future.
>>>>
>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com>
>>>> wrote:
>>>> > +dev@
>>>> >
>>>> > I am in complete agreement with Luke. Data dependencies are easy to
>>>> > understand and a good way for an IO to communicate and establish
>>>> causal
>>>> > dependencies. Converting an IO from PDone to real output may spur
>>>> further
>>>> > useful thoughts based on the design decisions about what sort of
>>>> output is
>>>> > most useful.
>>>> >
>>>> > Kenn
>>>> >
>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com>
>>>> wrote:
>>>> >>
>>>> >> I think all sinks actually do have valuable information to output
>>>> which
>>>> >> can be used after a write (file names, transaction/commit/row ids,
>>>> table
>>>> >> names, ...). In addition to this metadata, having a PCollection of
>>>> all
>>>> >> successful writes and all failed writes is useful for users so they
>>>> can
>>>> >> chain an action which depends on what was or wasn't successfully
>>>> written.
>>>> >> Users have requested adding retry/failure handling policies to sinks
>>>> so that
>>>> >> failed writes don't jam up the pipeline.
>>>> >>
>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>> chet.aldr...@postmates.com>
>>>> >> wrote:
>>>> >>>
>>>> >>> So I agree generally with the idea that returning a PCollection
>>>> makes all
>>>> >>> of this easier so that arbitrary additional functions can be added,
>>>> what
>>>> >>> exactly would write functions be returning in a PCollection that
>>>> would make
>>>> >>> sense? The whole idea is that we’ve written to an external source
>>>> and now
>>>> >>> the collection itself is no longer needed.
>>>> >>>
>>>> >>> Currently, that’s represented with a PDone, but currently that
>>>> doesn’t
>>>> >>> allow any work to occur after it. I see a couple possible ways of
>>>> handling
>>>> >>> this given this conversation, and am curious which solution sounds
>>>> like the
>>>> >>> best way to deal with the problem:
>>>> >>>
>>>> >>> 1. Have output transforms always return something specific (which
>>>> would
>>>> >>> be the same across transforms by convention), that is in the form
>>>> of a
>>>> >>> PCollection, so operations can occur after it.
>>>> >>>
>>>> >>> 2. Make either PDone or some new type that can act as a PCollection
>>>> so we
>>>> >>> can run applies afterward.
>>>> >>>
>>>> >>> 3. Make output transforms provide the facility for a callback
>>>> function
>>>> >>> which runs after the transform is complete.
>>>> >>>
>>>> >>> I went through these gymnastics recently when I was trying to build
>>>> >>> something that would move indices after writing to Algolia, and the
>>>> solution
>>>> >>> was to co-opt code from the old Sink class that used to exist in
>>>> Beam. The
>>>> >>> problem is that particular method requires the output transform in
>>>> question
>>>> >>> to return a PCollection, even if it is trivial or doesn’t make
>>>> sense to
>>>> >>> return one. This seems like a bad solution, but unfortunately there
>>>> isn’t a
>>>> >>> notion of a transform that has no explicit output that needs to have
>>>> >>> operations occur after it.
>>>> >>>
>>>> >>> The three potential solutions above address this issue, but I would
>>>> like
>>>> >>> to hear on which would be preferable (or perhaps a different
>>>> proposal
>>>> >>> altogether?). Perhaps we could also start up a ticket on this,
>>>> since it
>>>> >>> seems like a worthwhile feature addition. I would find it really
>>>> useful, for
>>>> >>> one.
>>>> >>>
>>>> >>> Chet
>>>> >>>
>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>> >>>
>>>> >>> Instead of a callback fn, its most useful if a PCollection is
>>>> returned
>>>> >>> containing the result of the sink so that any arbitrary additional
>>>> functions
>>>> >>> can be applied.
>>>> >>>
>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>> j...@nanthrax.net>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> Agree, I would prefer to do the callback in the IO more than in the
>>>> >>>> main.
>>>> >>>>
>>>> >>>> Regards
>>>> >>>> JB
>>>> >>>>
>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>> >>>>>
>>>> >>>>> I do something almost exactly like this, but with BigtableIO
>>>> instead.
>>>> >>>>> I have a pull request open here [1] (which reminds me I need to
>>>> finish this
>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>> something like
>>>> >>>>> this.
>>>> >>>>>
>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output
>>>> from
>>>> >>>>> the BigtableIO, and then feed that into your function which will
>>>> run when
>>>> >>>>> all writes finish.
>>>> >>>>>
>>>> >>>>> You probably want to avoid doing something in the main method
>>>> because
>>>> >>>>> there's no guarantee it'll actually run (maybe the driver will
>>>> die, get
>>>> >>>>> killed, machine will explode, etc).
>>>> >>>>>
>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>> >>>>>
>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com
>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>> >>>>>
>>>> >>>>>     Assuming you're in Java. You could just follow on in your Main
>>>> >>>>> method.
>>>> >>>>>     Checking the state of the Result.
>>>> >>>>>
>>>> >>>>>     Example:
>>>> >>>>>     PipelineResult result = pipeline.run();
>>>> >>>>>     try {
>>>> >>>>>     result.waitUntilFinish();
>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>> >>>>>     //DO ES work
>>>> >>>>>     }
>>>> >>>>>     } catch(Exception e) {
>>>> >>>>>     result.cancel();
>>>> >>>>>     throw e;
>>>> >>>>>     }
>>>> >>>>>
>>>> >>>>>     Otherwise you could also use Oozie to construct a work flow.
>>>> >>>>>
>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>> >>>>> <j...@nanthrax.net
>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>> >>>>>
>>>> >>>>>         Hi,
>>>> >>>>>
>>>> >>>>>         yes, we had a similar question some days ago.
>>>> >>>>>
>>>> >>>>>         We can imagine to have a user callback fn fired when the
>>>> sink
>>>> >>>>> batch is
>>>> >>>>>         complete.
>>>> >>>>>
>>>> >>>>>         Let me think about that.
>>>> >>>>>
>>>> >>>>>         Regards
>>>> >>>>>         JB
>>>> >>>>>
>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>> >>>>>
>>>> >>>>>             Hey JB,
>>>> >>>>>
>>>> >>>>>             Thanks for getting back so quickly.
>>>> >>>>>             I suppose in that case I would need a way of
>>>> monitoring
>>>> >>>>> when the ES
>>>> >>>>>             transform completes successfully before I can proceed
>>>> with
>>>> >>>>> doing the
>>>> >>>>>             swap.
>>>> >>>>>             The problem with this is that I can't think of a good
>>>> way
>>>> >>>>> to
>>>> >>>>>             determine that termination state short of polling the
>>>> new
>>>> >>>>> index to
>>>> >>>>>             check the document count compared to the size of input
>>>> >>>>> PCollection.
>>>> >>>>>             That, or maybe I'd need to use an external system
>>>> like you
>>>> >>>>> mentioned
>>>> >>>>>             to poll on the state of the pipeline (I'm using Google
>>>> >>>>> Dataflow, so
>>>> >>>>>             maybe there's a way to do this with some API).
>>>> >>>>>             But I would have thought that there would be an easy
>>>> way of
>>>> >>>>> simply
>>>> >>>>>             saying "do not process this transform until this other
>>>> >>>>> transform
>>>> >>>>>             completes".
>>>> >>>>>             Is there no established way of "signaling" between
>>>> >>>>> pipelines when
>>>> >>>>>             some pipeline completes, or have some way of
>>>> declaring a
>>>> >>>>> dependency
>>>> >>>>>             of 1 transform on another transform?
>>>> >>>>>
>>>> >>>>>             Thanks again,
>>>> >>>>>             Philip
>>>> >>>>>
>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>> >>>>> <mailto:j...@nanthrax.net
>>>> >>>>>
>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>> >>>>>
>>>> >>>>>                  Hi Philip,
>>>> >>>>>
>>>> >>>>>                  You won't be able to do (3) in the same pipeline
>>>> as
>>>> >>>>> the
>>>> >>>>>             Elasticsearch Sink
>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>> >>>>>
>>>> >>>>>                  So, (3) has to be done in another pipeline
>>>> (using a
>>>> >>>>> DoFn) or in
>>>> >>>>>             another
>>>> >>>>>                  "system" (like Camel for instance). I would do a
>>>> check
>>>> >>>>> of the
>>>> >>>>>             data in the
>>>> >>>>>                  index and then trigger the swap there.
>>>> >>>>>
>>>> >>>>>                  Regards
>>>> >>>>>                  JB
>>>> >>>>>
>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>> >>>>>
>>>> >>>>>                      Hi,
>>>> >>>>>
>>>> >>>>>                      I'm pretty new to Beam, and I've been trying
>>>> to
>>>> >>>>> use the
>>>> >>>>>             ElasticSearchIO
>>>> >>>>>                      sink to write docs into ES.
>>>> >>>>>                      With this, I want to be able to
>>>> >>>>>                      1. ingest and transform rows from DB (done)
>>>> >>>>>                      2. write JSON docs/strings into a new ES
>>>> index
>>>> >>>>> (done)
>>>> >>>>>                      3. After (2) is complete and all documents
>>>> are
>>>> >>>>> written into
>>>> >>>>>             a new index,
>>>> >>>>>                      trigger an atomic index swap under an alias
>>>> to
>>>> >>>>> replace the
>>>> >>>>>             current
>>>> >>>>>                      aliased index with the new index generated
>>>> in step
>>>> >>>>> 2. This
>>>> >>>>>             is basically
>>>> >>>>>                      a single POST request to the ES cluster.
>>>> >>>>>
>>>> >>>>>                      The problem I'm facing is that I don't seem
>>>> to be
>>>> >>>>> able to
>>>> >>>>>             find a way to
>>>> >>>>>                      have a way for (3) to happen after step (2)
>>>> is
>>>> >>>>> complete.
>>>> >>>>>
>>>> >>>>>                      The ElasticSearchIO.Write transform returns a
>>>> >>>>> PDone, and
>>>> >>>>>             I'm not sure
>>>> >>>>>                      how to proceed from there because it doesn't
>>>> seem
>>>> >>>>> to let me
>>>> >>>>>             do another
>>>> >>>>>                      apply on it to "define" a dependency.
>>>> >>>>>
>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>> >>>>>
>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>
>>>> >>>>>
>>>> >>>>>                      Is there a recommended way to construct
>>>> pipelines
>>>> >>>>> workflows
>>>> >>>>>             like this?
>>>> >>>>>
>>>> >>>>>                      Thanks in advance,
>>>> >>>>>                      Philip
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>> jbono...@apache.org>>
>>>> >>>>>             http://blog.nanthrax.net
>>>> >>>>>                  Talend - http://www.talend.com
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>         --         Jean-Baptiste Onofré
>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>> >>>>>         http://blog.nanthrax.net
>>>> >>>>>         Talend - http://www.talend.com
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>> >>>>>     ----------------------------------------------------
>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>
>>>> >>>> --
>>>> >>>> Jean-Baptiste Onofré
>>>> >>>> jbono...@apache.org
>>>> >>>> http://blog.nanthrax.net
>>>> >>>> Talend - http://www.talend.com
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>

Reply via email to