This sounds great! On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org> wrote:
> This would be absolutely great! It seems somewhat similar to the changes > that were made to the BigQuery sink to support WriteResult ( > https://github.com/apache/beam/blob/master/sdks/java/io/ > google-cloud-platform/src/main/java/org/apache/beam/sdk/ > io/gcp/bigquery/WriteResult.java). > > I find it helpful to think about the different things that may come after > a sink. For instance: > > 1. It might be helpful to have a collection of failing input elements. The > type of failed elements is pretty straightforward -- just the input > elements. This allows handling such failures by directing them elsewhere or > performing additional processing. > BigQueryIO already does this as you point out. > > 2. For a sink that produces a series of files, it might be useful to have > a collection of the file names that have been completely written. This > allows performing additional handling on these completed segments. > In fact we already do this for FileBasedSinks. See https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java > > 3. For a sink that updates some destination, it would be reasonable to > have a collection that provides (periodically) output indicating how > complete the information written to that destination is. For instance, this > might be something like "<this bigquery table> has all of the elements up > to <input watermark>" complete. This allows tracking how much information > has been completely written out. > Interesting. Maybe tough to do since sinks often don't have that knowledge. > > I think those concepts map to the more detailed description Eugene > provided, but I find it helpful to focus on what information comes out of > the sink and how it might be used. > > Were there any use cases the above miss? Any functionality that has been > described that doesn't map to these use cases? > > -- Ben > > On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> It makes sense to consider how this maps onto existing kinds of sinks. >> >> E.g.: >> - Something that just makes an RPC per record, e.g. MqttIO.write(): that >> will emit 1 result per bundle (either a bogus value or number of records >> written) that will be Combine'd into 1 result per pane of input. A user can >> sequence against this and be notified when some intermediate amount of data >> has been written for a window, or (via .isFinal()) when all of it has been >> written. >> - Something that e.g. initiates an import job, such as >> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic >> index swap: should emit 1 result per import job, e.g. containing >> information about the job (e.g. its id and statistics). Role of panes is >> the same. >> - Something like above but that supports dynamic destinations: like in >> WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> where >> ResultT may be something like a list of files that were written for this >> pane of this destination. >> >> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> I agree that the proper API for enabling the use case "do something >>> after the data has been written" is to return a PCollection of objects >>> where each object represents the result of writing some identifiable subset >>> of the data. Then one can apply a ParDo to this PCollection, in order to >>> "do something after this subset has been written". >>> >>> The challenging part here is *identifying* the subset of the data that's >>> been written, in a way consistent with Beam's unified batch/streaming >>> model, where saying "all data has been written" is not an option because >>> more data can arrive. >>> >>> The next choice is "a window of input has been written", but then again, >>> late data can arrive into a window as well. >>> >>> Next choice after that is "a pane of input has been written", but per >>> https://s.apache.org/beam-sink-triggers the term "pane of input" is >>> moot: triggering and panes should be something private to the sink, and the >>> same input can trigger different sinks differently. The hypothetical >>> different accumulation modes make this trickier still. I'm not sure whether >>> we intend to also challenge the idea that windowing is inherent to the >>> collection, or whether it too should be specified on a transform that >>> processes the collection. I think for the sake of this discussion we can >>> assume that it's inherent, and assume the mental model that the elements in >>> different windows of a PCollection are processed independently - "as if" >>> there were multiple pipelines processing each window. >>> >>> Overall, embracing the full picture, we end up with something like this: >>> - The input PCollection is a composition of windows. >>> - If the windowing strategy is non-merging (e.g. fixed or sliding >>> windows), the below applies to the entire contents of the PCollection. If >>> it's merging (e.g. session windows), then it applies per-key, and the input >>> should be (perhaps implicitly) keyed in a way that the sink understands - >>> for example, the grouping by destination in DynamicDestinations in file and >>> bigquery writes. >>> - Each window's contents is a "changelog" - stream of elements and >>> retractions. >>> - A "sink" processes each window of the collection, deciding how to >>> handle elements and retractions (and whether to support retractions at all) >>> in a sink-specific way, and deciding *when* to perform the side effects for >>> a portion of the changelog (a "pane") based on the sink's triggering >>> strategy. >>> - If the side effect itself is parallelized, then there'll be multiple >>> results for the pane - e.g. one per bundle. >>> - Each (sink-chosen) pane produces a set of results, e.g. a list of >>> filenames that have been written, or simply a number of records that was >>> written, or a bogus void value etc. The result will implicitly include the >>> window of the input it's associated with. It will also implicitly include >>> pane information - index of the pane in this window, and whether this is >>> the first or last pane. >>> - The partitioning into bundles is an implementation detail and not very >>> useful, so before presenting the pane write results to the user, the sink >>> will probably want to Combine the bundle results so that there ends up >>> being 1 value for each pane that was written. Once again note that panes >>> may be associated with windows of the input as a whole, but if the input is >>> keyed (like with DynamicDestinations) they'll be associated with per-key >>> subsets of windows of the input. >>> - This combining requires an extra, well, combining operation, so it >>> should be optional. >>> - The user will end up getting either a PCollection<ResultT> or a >>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where >>> the elements of this collection will implicitly have window and pane >>> information, available via the implicit BoundedWindow and PaneInfo. >>> - Until "sink triggering" is implemented, we'll have to embrace the fact >>> that trigger strategy is set on the input. But in that case the user will >>> have to accept that the PaneInfo of ResultT's is not necessarily directly >>> related to panes of the input - the sink is allowed to do internal >>> aggregation as an implementation detail, which may modify the triggering >>> strategy. Basically the user will still get sink-assigned panes. >>> - In most cases, one may imagine that the user is interested in being >>> notified of "no more data associated with this window will be written", so >>> the user will ignore all ResultT's except those where the pane is marked >>> final. If a user is interested in being notified of intermediate write >>> results - they'll have to embrace the fact that they cannot identify the >>> precise subset of input associated with the intermediate result. >>> >>> I think the really key points of the above are: >>> - Sinks should support windowed input. Sinks should write different >>> windows of input independently. If the sink can write multi-destination >>> input, the destination should function as a grouping key, and in that case >>> merging windowing should be allowed. >>> - Producing a PCollection of write results should be optional. >>> - When asked to produce results, sinks produce a PCollection of results >>> that may be keyed or unkeyed (per above), and are placed in the window of >>> the input that was written, and have a PaneInfo assigned by the sink, of >>> which probably the only part useful to the user is whether it's .isFinal(). >>> >>> Does this sound reasonable? >>> >>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> +1 >>>> >>>> At the very least an empty PCollection<?> could be produced with no >>>> promises about its contents but the ability to be followed (e.g. as a >>>> side input), which is forward compatible with whatever actual metadata >>>> one may decide to produce in the future. >>>> >>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> >>>> wrote: >>>> > +dev@ >>>> > >>>> > I am in complete agreement with Luke. Data dependencies are easy to >>>> > understand and a good way for an IO to communicate and establish >>>> causal >>>> > dependencies. Converting an IO from PDone to real output may spur >>>> further >>>> > useful thoughts based on the design decisions about what sort of >>>> output is >>>> > most useful. >>>> > >>>> > Kenn >>>> > >>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> >>>> wrote: >>>> >> >>>> >> I think all sinks actually do have valuable information to output >>>> which >>>> >> can be used after a write (file names, transaction/commit/row ids, >>>> table >>>> >> names, ...). In addition to this metadata, having a PCollection of >>>> all >>>> >> successful writes and all failed writes is useful for users so they >>>> can >>>> >> chain an action which depends on what was or wasn't successfully >>>> written. >>>> >> Users have requested adding retry/failure handling policies to sinks >>>> so that >>>> >> failed writes don't jam up the pipeline. >>>> >> >>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>> chet.aldr...@postmates.com> >>>> >> wrote: >>>> >>> >>>> >>> So I agree generally with the idea that returning a PCollection >>>> makes all >>>> >>> of this easier so that arbitrary additional functions can be added, >>>> what >>>> >>> exactly would write functions be returning in a PCollection that >>>> would make >>>> >>> sense? The whole idea is that we’ve written to an external source >>>> and now >>>> >>> the collection itself is no longer needed. >>>> >>> >>>> >>> Currently, that’s represented with a PDone, but currently that >>>> doesn’t >>>> >>> allow any work to occur after it. I see a couple possible ways of >>>> handling >>>> >>> this given this conversation, and am curious which solution sounds >>>> like the >>>> >>> best way to deal with the problem: >>>> >>> >>>> >>> 1. Have output transforms always return something specific (which >>>> would >>>> >>> be the same across transforms by convention), that is in the form >>>> of a >>>> >>> PCollection, so operations can occur after it. >>>> >>> >>>> >>> 2. Make either PDone or some new type that can act as a PCollection >>>> so we >>>> >>> can run applies afterward. >>>> >>> >>>> >>> 3. Make output transforms provide the facility for a callback >>>> function >>>> >>> which runs after the transform is complete. >>>> >>> >>>> >>> I went through these gymnastics recently when I was trying to build >>>> >>> something that would move indices after writing to Algolia, and the >>>> solution >>>> >>> was to co-opt code from the old Sink class that used to exist in >>>> Beam. The >>>> >>> problem is that particular method requires the output transform in >>>> question >>>> >>> to return a PCollection, even if it is trivial or doesn’t make >>>> sense to >>>> >>> return one. This seems like a bad solution, but unfortunately there >>>> isn’t a >>>> >>> notion of a transform that has no explicit output that needs to have >>>> >>> operations occur after it. >>>> >>> >>>> >>> The three potential solutions above address this issue, but I would >>>> like >>>> >>> to hear on which would be preferable (or perhaps a different >>>> proposal >>>> >>> altogether?). Perhaps we could also start up a ticket on this, >>>> since it >>>> >>> seems like a worthwhile feature addition. I would find it really >>>> useful, for >>>> >>> one. >>>> >>> >>>> >>> Chet >>>> >>> >>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>> >>>> >>> Instead of a callback fn, its most useful if a PCollection is >>>> returned >>>> >>> containing the result of the sink so that any arbitrary additional >>>> functions >>>> >>> can be applied. >>>> >>> >>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>> j...@nanthrax.net> >>>> >>> wrote: >>>> >>>> >>>> >>>> Agree, I would prefer to do the callback in the IO more than in the >>>> >>>> main. >>>> >>>> >>>> >>>> Regards >>>> >>>> JB >>>> >>>> >>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>> >>>>> >>>> >>>>> I do something almost exactly like this, but with BigtableIO >>>> instead. >>>> >>>>> I have a pull request open here [1] (which reminds me I need to >>>> finish this >>>> >>>>> up...). It would really be nice for most IOs to support >>>> something like >>>> >>>>> this. >>>> >>>>> >>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output >>>> from >>>> >>>>> the BigtableIO, and then feed that into your function which will >>>> run when >>>> >>>>> all writes finish. >>>> >>>>> >>>> >>>>> You probably want to avoid doing something in the main method >>>> because >>>> >>>>> there's no guarantee it'll actually run (maybe the driver will >>>> die, get >>>> >>>>> killed, machine will explode, etc). >>>> >>>>> >>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>> >>>>> >>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com >>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>> >>>>> >>>> >>>>> Assuming you're in Java. You could just follow on in your Main >>>> >>>>> method. >>>> >>>>> Checking the state of the Result. >>>> >>>>> >>>> >>>>> Example: >>>> >>>>> PipelineResult result = pipeline.run(); >>>> >>>>> try { >>>> >>>>> result.waitUntilFinish(); >>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>> >>>>> //DO ES work >>>> >>>>> } >>>> >>>>> } catch(Exception e) { >>>> >>>>> result.cancel(); >>>> >>>>> throw e; >>>> >>>>> } >>>> >>>>> >>>> >>>>> Otherwise you could also use Oozie to construct a work flow. >>>> >>>>> >>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>> >>>>> <j...@nanthrax.net >>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>> >>>>> >>>> >>>>> Hi, >>>> >>>>> >>>> >>>>> yes, we had a similar question some days ago. >>>> >>>>> >>>> >>>>> We can imagine to have a user callback fn fired when the >>>> sink >>>> >>>>> batch is >>>> >>>>> complete. >>>> >>>>> >>>> >>>>> Let me think about that. >>>> >>>>> >>>> >>>>> Regards >>>> >>>>> JB >>>> >>>>> >>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>> >>>>> >>>> >>>>> Hey JB, >>>> >>>>> >>>> >>>>> Thanks for getting back so quickly. >>>> >>>>> I suppose in that case I would need a way of >>>> monitoring >>>> >>>>> when the ES >>>> >>>>> transform completes successfully before I can proceed >>>> with >>>> >>>>> doing the >>>> >>>>> swap. >>>> >>>>> The problem with this is that I can't think of a good >>>> way >>>> >>>>> to >>>> >>>>> determine that termination state short of polling the >>>> new >>>> >>>>> index to >>>> >>>>> check the document count compared to the size of input >>>> >>>>> PCollection. >>>> >>>>> That, or maybe I'd need to use an external system >>>> like you >>>> >>>>> mentioned >>>> >>>>> to poll on the state of the pipeline (I'm using Google >>>> >>>>> Dataflow, so >>>> >>>>> maybe there's a way to do this with some API). >>>> >>>>> But I would have thought that there would be an easy >>>> way of >>>> >>>>> simply >>>> >>>>> saying "do not process this transform until this other >>>> >>>>> transform >>>> >>>>> completes". >>>> >>>>> Is there no established way of "signaling" between >>>> >>>>> pipelines when >>>> >>>>> some pipeline completes, or have some way of >>>> declaring a >>>> >>>>> dependency >>>> >>>>> of 1 transform on another transform? >>>> >>>>> >>>> >>>>> Thanks again, >>>> >>>>> Philip >>>> >>>>> >>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré >>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>> >>>>> <mailto:j...@nanthrax.net >>>> >>>>> >>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>> >>>>> >>>> >>>>> Hi Philip, >>>> >>>>> >>>> >>>>> You won't be able to do (3) in the same pipeline >>>> as >>>> >>>>> the >>>> >>>>> Elasticsearch Sink >>>> >>>>> PTransform ends the pipeline with PDone. >>>> >>>>> >>>> >>>>> So, (3) has to be done in another pipeline >>>> (using a >>>> >>>>> DoFn) or in >>>> >>>>> another >>>> >>>>> "system" (like Camel for instance). I would do a >>>> check >>>> >>>>> of the >>>> >>>>> data in the >>>> >>>>> index and then trigger the swap there. >>>> >>>>> >>>> >>>>> Regards >>>> >>>>> JB >>>> >>>>> >>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >>>> >>>>> >>>> >>>>> Hi, >>>> >>>>> >>>> >>>>> I'm pretty new to Beam, and I've been trying >>>> to >>>> >>>>> use the >>>> >>>>> ElasticSearchIO >>>> >>>>> sink to write docs into ES. >>>> >>>>> With this, I want to be able to >>>> >>>>> 1. ingest and transform rows from DB (done) >>>> >>>>> 2. write JSON docs/strings into a new ES >>>> index >>>> >>>>> (done) >>>> >>>>> 3. After (2) is complete and all documents >>>> are >>>> >>>>> written into >>>> >>>>> a new index, >>>> >>>>> trigger an atomic index swap under an alias >>>> to >>>> >>>>> replace the >>>> >>>>> current >>>> >>>>> aliased index with the new index generated >>>> in step >>>> >>>>> 2. This >>>> >>>>> is basically >>>> >>>>> a single POST request to the ES cluster. >>>> >>>>> >>>> >>>>> The problem I'm facing is that I don't seem >>>> to be >>>> >>>>> able to >>>> >>>>> find a way to >>>> >>>>> have a way for (3) to happen after step (2) >>>> is >>>> >>>>> complete. >>>> >>>>> >>>> >>>>> The ElasticSearchIO.Write transform returns a >>>> >>>>> PDone, and >>>> >>>>> I'm not sure >>>> >>>>> how to proceed from there because it doesn't >>>> seem >>>> >>>>> to let me >>>> >>>>> do another >>>> >>>>> apply on it to "define" a dependency. >>>> >>>>> >>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>> >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>> >>>>> >>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>> >>>> >>>>> >>>> >>>>> Is there a recommended way to construct >>>> pipelines >>>> >>>>> workflows >>>> >>>>> like this? >>>> >>>>> >>>> >>>>> Thanks in advance, >>>> >>>>> Philip >>>> >>>>> >>>> >>>>> >>>> >>>>> -- Jean-Baptiste Onofré >>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>> jbono...@apache.org>> >>>> >>>>> http://blog.nanthrax.net >>>> >>>>> Talend - http://www.talend.com >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> -- Jean-Baptiste Onofré >>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>> >>>>> http://blog.nanthrax.net >>>> >>>>> Talend - http://www.talend.com >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> -- Nick Verbeck - NerdyNick >>>> >>>>> ---------------------------------------------------- >>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Jean-Baptiste Onofré >>>> >>>> jbono...@apache.org >>>> >>>> http://blog.nanthrax.net >>>> >>>> Talend - http://www.talend.com >>>> >>> >>>> >>> >>>> >>> >>>> >> >>>> > >>>> >>>