I think it is also well-defined for the on-time (watermark) pane, as there is always only one such. In general I think sequencing is well defined for three panes:
1. The first pane fired (this is how side inputs are sequenced) 2. The first on-time pane, as there is only one. 3. The final pane. On Thu, Dec 21, 2017 at 5:49 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > Yeah. And I don't think there's a good way to define what sequencing even > means, if the sink is returning results in windows that aren't gonna have a > final pane. > > On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote: > >> This is only for "final pane" waiting, correct? So someone who writes a >> sink in the global window probably would not want to use this. >> >> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> PR is out https://github.com/apache/beam/pull/4301 >>> >>> This should allow us to have useful sequencing for sinks like BigtableIO >>> / BigQueryIO. >>> >>> Adding a couple of interested parties: >>> - Steve, would you be interested in using this in >>> https://github.com/apache/beam/pull/3997 ? >>> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 - >>> in particular, this works properly in case the input can be firing multiple >>> times. >>> >>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> I figured out the Never.ever() approach and it seems to work. Will >>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems like >>>> this will be quite a useful transform. >>>> >>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> I'm a bit confused by all of these suggestions: they sound plausible >>>>> at a high level, but I'm having a hard time making any one of them >>>>> concrete. >>>>> >>>>> So suppose we want to create a transform Wait.on(PCollection<?> >>>>> signal): PCollection<T> -> PCollection<T>. >>>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical >>>>> to "a", but buffers panes of "a" in any given window until the final pane >>>>> of "sig" in the same window is fired (or, if it's never fired, until the >>>>> window closes? could use a deadletter for that maybe). >>>>> >>>>> This transform I suppose would need to have a keyed and unkeyed >>>>> version. >>>>> >>>>> The keyed version would support merging window fns, and would require >>>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk - >>>>> followed by a stateful ParDo? Or is there a way to get away without a >>>>> stateful ParDo here? (not all runners support it) >>>>> >>>>> The unkeyed version would not support merging window fns. Reuven, can >>>>> you elaborate how your combiner idea would work here - in particular, what >>>>> do you mean by "triggering only on the final pane"? Do you mean filter >>>>> non-final panes before entering the combiner? I wonder if that'll work, >>>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on >>>>> the side input with a Never.ever() trigger"? >>>>> >>>>> Thanks. >>>>> >>>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> This is an interesting point. >>>>>> >>>>>> In the past, we've often just though about sequencing some action to >>>>>> take place after the sink, in which case you can simply use the sink >>>>>> output >>>>>> as a main input. However if you want to run a transform with another >>>>>> PCollection as a main input, this doesn't work. And as you've discovered, >>>>>> triggered side inputs are defined to be non-deterministic, and there's no >>>>>> way to make things line up. >>>>>> >>>>>> What you're describing only makes sense if you're blocking against >>>>>> the final pane (since otherwise there's no reasonable way to match up >>>>>> somePC panes with the sink panes). There are multiple ways you can do >>>>>> this: >>>>>> one would be to CGBK the two PCollections together, and trigger the new >>>>>> transform only on the final pane. Another would be to add a combiner that >>>>>> returns a Void, triggering only on the final pane, and then make this >>>>>> singleton Void a side input. You could also do something explicit with >>>>>> the >>>>>> state API. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov < >>>>>> kirpic...@google.com> wrote: >>>>>> >>>>>>> So this appears not as easy as anticipated (surprise!) >>>>>>> >>>>>>> Suppose we have a PCollection "donePanes" with an element per >>>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of >>>>>>> data has been written; this pane is: final / non-final". >>>>>>> >>>>>>> Suppose we want to use this to ensure that >>>>>>> somePc.apply(ParDo.of(fn)) happens only after the final pane has been >>>>>>> written. >>>>>>> >>>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to >>>>>>> happen when c emits a *final* pane. >>>>>>> >>>>>>> Unfortunately, using ParDo.of(fn).withSideInputs( >>>>>>> donePanes.apply(View.asSingleton())) doesn't do the trick: the side >>>>>>> input becomes ready the moment *the first *pane of data has been >>>>>>> written. >>>>>>> >>>>>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter >>>>>>> only final panes...).apply(View.asSingleton())). It also becomes >>>>>>> ready the moment *the first* pane has been written, you just get an >>>>>>> exception if you access the side input before the *final* pane was >>>>>>> written. >>>>>>> >>>>>>> I can't think of a pure-Beam solution to this: either "donePanes" >>>>>>> will be used as a main input to something (and then everything else can >>>>>>> only be a side input, which is not general enough), or it will be used >>>>>>> as a >>>>>>> side input (and then we can't achieve "trigger only after the final pane >>>>>>> fires"). >>>>>>> >>>>>>> It seems that we need a way to control the side input pushback, and >>>>>>> configure whether a view becomes ready when its first pane has fired or >>>>>>> when its last pane has fired. I could see this be a property on the View >>>>>>> transform itself. In terms of implementation - I tried to figure out how >>>>>>> side input readiness is determined, in the direct runner and Dataflow >>>>>>> runner, and I'm completely lost and would appreciate some help. >>>>>>> >>>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> This sounds great! >>>>>>>> >>>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This would be absolutely great! It seems somewhat similar to the >>>>>>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/ >>>>>>>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >>>>>>>>> io/gcp/bigquery/WriteResult.java). >>>>>>>>> >>>>>>>>> I find it helpful to think about the different things that may >>>>>>>>> come after a sink. For instance: >>>>>>>>> >>>>>>>>> 1. It might be helpful to have a collection of failing input >>>>>>>>> elements. The type of failed elements is pretty straightforward -- >>>>>>>>> just the >>>>>>>>> input elements. This allows handling such failures by directing them >>>>>>>>> elsewhere or performing additional processing. >>>>>>>>> >>>>>>>> >>>>>>>> BigQueryIO already does this as you point out. >>>>>>>> >>>>>>>>> >>>>>>>>> 2. For a sink that produces a series of files, it might be useful >>>>>>>>> to have a collection of the file names that have been completely >>>>>>>>> written. >>>>>>>>> This allows performing additional handling on these completed >>>>>>>>> segments. >>>>>>>>> >>>>>>>> >>>>>>>> In fact we already do this for FileBasedSinks. See >>>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99 >>>>>>>> 956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/ >>>>>>>> io/WriteFilesResult.java >>>>>>>> >>>>>>>>> >>>>>>>>> 3. For a sink that updates some destination, it would be >>>>>>>>> reasonable to have a collection that provides (periodically) output >>>>>>>>> indicating how complete the information written to that destination >>>>>>>>> is. For >>>>>>>>> instance, this might be something like "<this bigquery table> has all >>>>>>>>> of >>>>>>>>> the elements up to <input watermark>" complete. This allows tracking >>>>>>>>> how >>>>>>>>> much information has been completely written out. >>>>>>>>> >>>>>>>> >>>>>>>> Interesting. Maybe tough to do since sinks often don't have that >>>>>>>> knowledge. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> I think those concepts map to the more detailed description Eugene >>>>>>>>> provided, but I find it helpful to focus on what information comes >>>>>>>>> out of >>>>>>>>> the sink and how it might be used. >>>>>>>>> >>>>>>>>> Were there any use cases the above miss? Any functionality that >>>>>>>>> has been described that doesn't map to these use cases? >>>>>>>>> >>>>>>>>> -- Ben >>>>>>>>> >>>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov < >>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>> >>>>>>>>>> It makes sense to consider how this maps onto existing kinds of >>>>>>>>>> sinks. >>>>>>>>>> >>>>>>>>>> E.g.: >>>>>>>>>> - Something that just makes an RPC per record, e.g. >>>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus >>>>>>>>>> value or >>>>>>>>>> number of records written) that will be Combine'd into 1 result per >>>>>>>>>> pane of >>>>>>>>>> input. A user can sequence against this and be notified when some >>>>>>>>>> intermediate amount of data has been written for a window, or (via >>>>>>>>>> .isFinal()) when all of it has been written. >>>>>>>>>> - Something that e.g. initiates an import job, such as >>>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up >>>>>>>>>> atomic >>>>>>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>>>>>> information about the job (e.g. its id and statistics). Role of >>>>>>>>>> panes is >>>>>>>>>> the same. >>>>>>>>>> - Something like above but that supports dynamic destinations: >>>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, >>>>>>>>>> ResultT>> >>>>>>>>>> where ResultT may be something like a list of files that were >>>>>>>>>> written for >>>>>>>>>> this pane of this destination. >>>>>>>>>> >>>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov < >>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> I agree that the proper API for enabling the use case "do >>>>>>>>>>> something after the data has been written" is to return a >>>>>>>>>>> PCollection of >>>>>>>>>>> objects where each object represents the result of writing some >>>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this >>>>>>>>>>> PCollection, in order to "do something after this subset has been >>>>>>>>>>> written". >>>>>>>>>>> >>>>>>>>>>> The challenging part here is *identifying* the subset of the >>>>>>>>>>> data that's been written, in a way consistent with Beam's unified >>>>>>>>>>> batch/streaming model, where saying "all data has been written" is >>>>>>>>>>> not an >>>>>>>>>>> option because more data can arrive. >>>>>>>>>>> >>>>>>>>>>> The next choice is "a window of input has been written", but >>>>>>>>>>> then again, late data can arrive into a window as well. >>>>>>>>>>> >>>>>>>>>>> Next choice after that is "a pane of input has been written", >>>>>>>>>>> but per https://s.apache.org/beam-sink-triggers the term "pane >>>>>>>>>>> of input" is moot: triggering and panes should be something private >>>>>>>>>>> to the >>>>>>>>>>> sink, and the same input can trigger different sinks differently. >>>>>>>>>>> The >>>>>>>>>>> hypothetical different accumulation modes make this trickier still. >>>>>>>>>>> I'm not >>>>>>>>>>> sure whether we intend to also challenge the idea that windowing is >>>>>>>>>>> inherent to the collection, or whether it too should be specified >>>>>>>>>>> on a >>>>>>>>>>> transform that processes the collection. I think for the sake of >>>>>>>>>>> this >>>>>>>>>>> discussion we can assume that it's inherent, and assume the mental >>>>>>>>>>> model >>>>>>>>>>> that the elements in different windows of a PCollection are >>>>>>>>>>> processed >>>>>>>>>>> independently - "as if" there were multiple pipelines processing >>>>>>>>>>> each >>>>>>>>>>> window. >>>>>>>>>>> >>>>>>>>>>> Overall, embracing the full picture, we end up with something >>>>>>>>>>> like this: >>>>>>>>>>> - The input PCollection is a composition of windows. >>>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or >>>>>>>>>>> sliding windows), the below applies to the entire contents of the >>>>>>>>>>> PCollection. If it's merging (e.g. session windows), then it applies >>>>>>>>>>> per-key, and the input should be (perhaps implicitly) keyed in a >>>>>>>>>>> way that >>>>>>>>>>> the sink understands - for example, the grouping by destination in >>>>>>>>>>> DynamicDestinations in file and bigquery writes. >>>>>>>>>>> - Each window's contents is a "changelog" - stream of elements >>>>>>>>>>> and retractions. >>>>>>>>>>> - A "sink" processes each window of the collection, deciding how >>>>>>>>>>> to handle elements and retractions (and whether to support >>>>>>>>>>> retractions at >>>>>>>>>>> all) in a sink-specific way, and deciding *when* to perform the side >>>>>>>>>>> effects for a portion of the changelog (a "pane") based on the >>>>>>>>>>> sink's >>>>>>>>>>> triggering strategy. >>>>>>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list >>>>>>>>>>> of filenames that have been written, or simply a number of records >>>>>>>>>>> that was >>>>>>>>>>> written, or a bogus void value etc. The result will implicitly >>>>>>>>>>> include the >>>>>>>>>>> window of the input it's associated with. It will also implicitly >>>>>>>>>>> include >>>>>>>>>>> pane information - index of the pane in this window, and whether >>>>>>>>>>> this is >>>>>>>>>>> the first or last pane. >>>>>>>>>>> - The partitioning into bundles is an implementation detail and >>>>>>>>>>> not very useful, so before presenting the pane write results to the >>>>>>>>>>> user, >>>>>>>>>>> the sink will probably want to Combine the bundle results so that >>>>>>>>>>> there >>>>>>>>>>> ends up being 1 value for each pane that was written. Once again >>>>>>>>>>> note that >>>>>>>>>>> panes may be associated with windows of the input as a whole, but >>>>>>>>>>> if the >>>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be >>>>>>>>>>> associated with >>>>>>>>>>> per-key subsets of windows of the input. >>>>>>>>>>> - This combining requires an extra, well, combining operation, >>>>>>>>>>> so it should be optional. >>>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or >>>>>>>>>>> a PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and >>>>>>>>>>> ResultT, where >>>>>>>>>>> the elements of this collection will implicitly have window and pane >>>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo. >>>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace >>>>>>>>>>> the fact that trigger strategy is set on the input. But in that >>>>>>>>>>> case the >>>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not >>>>>>>>>>> necessarily >>>>>>>>>>> directly related to panes of the input - the sink is allowed to do >>>>>>>>>>> internal >>>>>>>>>>> aggregation as an implementation detail, which may modify the >>>>>>>>>>> triggering >>>>>>>>>>> strategy. Basically the user will still get sink-assigned panes. >>>>>>>>>>> - In most cases, one may imagine that the user is interested in >>>>>>>>>>> being notified of "no more data associated with this window will be >>>>>>>>>>> written", so the user will ignore all ResultT's except those where >>>>>>>>>>> the pane >>>>>>>>>>> is marked final. If a user is interested in being notified of >>>>>>>>>>> intermediate >>>>>>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>>>>>> identify >>>>>>>>>>> the precise subset of input associated with the intermediate result. >>>>>>>>>>> >>>>>>>>>>> I think the really key points of the above are: >>>>>>>>>>> - Sinks should support windowed input. Sinks should write >>>>>>>>>>> different windows of input independently. If the sink can write >>>>>>>>>>> multi-destination input, the destination should function as a >>>>>>>>>>> grouping key, >>>>>>>>>>> and in that case merging windowing should be allowed. >>>>>>>>>>> - Producing a PCollection of write results should be optional. >>>>>>>>>>> - When asked to produce results, sinks produce a PCollection of >>>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in >>>>>>>>>>> the >>>>>>>>>>> window of the input that was written, and have a PaneInfo assigned >>>>>>>>>>> by the >>>>>>>>>>> sink, of which probably the only part useful to the user is whether >>>>>>>>>>> it's >>>>>>>>>>> .isFinal(). >>>>>>>>>>> >>>>>>>>>>> Does this sound reasonable? >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw < >>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> +1 >>>>>>>>>>>> >>>>>>>>>>>> At the very least an empty PCollection<?> could be produced >>>>>>>>>>>> with no >>>>>>>>>>>> promises about its contents but the ability to be followed >>>>>>>>>>>> (e.g. as a >>>>>>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>>>>>> metadata >>>>>>>>>>>> one may decide to produce in the future. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles < >>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>> > +dev@ >>>>>>>>>>>> > >>>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are >>>>>>>>>>>> easy to >>>>>>>>>>>> > understand and a good way for an IO to communicate and >>>>>>>>>>>> establish causal >>>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may >>>>>>>>>>>> spur further >>>>>>>>>>>> > useful thoughts based on the design decisions about what sort >>>>>>>>>>>> of output is >>>>>>>>>>>> > most useful. >>>>>>>>>>>> > >>>>>>>>>>>> > Kenn >>>>>>>>>>>> > >>>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik < >>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>> >> >>>>>>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>>>>>> output which >>>>>>>>>>>> >> can be used after a write (file names, >>>>>>>>>>>> transaction/commit/row ids, table >>>>>>>>>>>> >> names, ...). In addition to this metadata, having a >>>>>>>>>>>> PCollection of all >>>>>>>>>>>> >> successful writes and all failed writes is useful for users >>>>>>>>>>>> so they can >>>>>>>>>>>> >> chain an action which depends on what was or wasn't >>>>>>>>>>>> successfully written. >>>>>>>>>>>> >> Users have requested adding retry/failure handling policies >>>>>>>>>>>> to sinks so that >>>>>>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>>>>>> >> >>>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>>>>>> chet.aldr...@postmates.com> >>>>>>>>>>>> >> wrote: >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> So I agree generally with the idea that returning a >>>>>>>>>>>> PCollection makes all >>>>>>>>>>>> >>> of this easier so that arbitrary additional functions can >>>>>>>>>>>> be added, what >>>>>>>>>>>> >>> exactly would write functions be returning in a PCollection >>>>>>>>>>>> that would make >>>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external >>>>>>>>>>>> source and now >>>>>>>>>>>> >>> the collection itself is no longer needed. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently >>>>>>>>>>>> that doesn’t >>>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible >>>>>>>>>>>> ways of handling >>>>>>>>>>>> >>> this given this conversation, and am curious which solution >>>>>>>>>>>> sounds like the >>>>>>>>>>>> >>> best way to deal with the problem: >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> 1. Have output transforms always return something specific >>>>>>>>>>>> (which would >>>>>>>>>>>> >>> be the same across transforms by convention), that is in >>>>>>>>>>>> the form of a >>>>>>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>>>>>> PCollection so we >>>>>>>>>>>> >>> can run applies afterward. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> 3. Make output transforms provide the facility for a >>>>>>>>>>>> callback function >>>>>>>>>>>> >>> which runs after the transform is complete. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> I went through these gymnastics recently when I was trying >>>>>>>>>>>> to build >>>>>>>>>>>> >>> something that would move indices after writing to Algolia, >>>>>>>>>>>> and the solution >>>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to >>>>>>>>>>>> exist in Beam. The >>>>>>>>>>>> >>> problem is that particular method requires the output >>>>>>>>>>>> transform in question >>>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t >>>>>>>>>>>> make sense to >>>>>>>>>>>> >>> return one. This seems like a bad solution, but >>>>>>>>>>>> unfortunately there isn’t a >>>>>>>>>>>> >>> notion of a transform that has no explicit output that >>>>>>>>>>>> needs to have >>>>>>>>>>>> >>> operations occur after it. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> The three potential solutions above address this issue, but >>>>>>>>>>>> I would like >>>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a >>>>>>>>>>>> different proposal >>>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on >>>>>>>>>>>> this, since it >>>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>>>>>> really useful, for >>>>>>>>>>>> >>> one. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Chet >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection >>>>>>>>>>>> is returned >>>>>>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>>>>>> additional functions >>>>>>>>>>>> >>> can be applied. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>>>>>> j...@nanthrax.net> >>>>>>>>>>>> >>> wrote: >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more >>>>>>>>>>>> than in the >>>>>>>>>>>> >>>> main. >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Regards >>>>>>>>>>>> >>>> JB >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> I do something almost exactly like this, but with >>>>>>>>>>>> BigtableIO instead. >>>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I >>>>>>>>>>>> need to finish this >>>>>>>>>>>> >>>>> up...). It would really be nice for most IOs to support >>>>>>>>>>>> something like >>>>>>>>>>>> >>>>> this. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on >>>>>>>>>>>> the output from >>>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function >>>>>>>>>>>> which will run when >>>>>>>>>>>> >>>>> all writes finish. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> You probably want to avoid doing something in the main >>>>>>>>>>>> method because >>>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver >>>>>>>>>>>> will die, get >>>>>>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>>>>>> nerdyn...@gmail.com >>>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Assuming you're in Java. You could just follow on in >>>>>>>>>>>> your Main >>>>>>>>>>>> >>>>> method. >>>>>>>>>>>> >>>>> Checking the state of the Result. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Example: >>>>>>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>>>>>> >>>>> try { >>>>>>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>>>>>> >>>>> //DO ES work >>>>>>>>>>>> >>>>> } >>>>>>>>>>>> >>>>> } catch(Exception e) { >>>>>>>>>>>> >>>>> result.cancel(); >>>>>>>>>>>> >>>>> throw e; >>>>>>>>>>>> >>>>> } >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Otherwise you could also use Oozie to construct a >>>>>>>>>>>> work flow. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>>>>>> >>>>> <j...@nanthrax.net >>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Hi, >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> We can imagine to have a user callback fn fired >>>>>>>>>>>> when the sink >>>>>>>>>>>> >>>>> batch is >>>>>>>>>>>> >>>>> complete. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Let me think about that. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Regards >>>>>>>>>>>> >>>>> JB >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Hey JB, >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>>>>>> >>>>> I suppose in that case I would need a way of >>>>>>>>>>>> monitoring >>>>>>>>>>>> >>>>> when the ES >>>>>>>>>>>> >>>>> transform completes successfully before I can >>>>>>>>>>>> proceed with >>>>>>>>>>>> >>>>> doing the >>>>>>>>>>>> >>>>> swap. >>>>>>>>>>>> >>>>> The problem with this is that I can't think >>>>>>>>>>>> of a good way >>>>>>>>>>>> >>>>> to >>>>>>>>>>>> >>>>> determine that termination state short of >>>>>>>>>>>> polling the new >>>>>>>>>>>> >>>>> index to >>>>>>>>>>>> >>>>> check the document count compared to the size >>>>>>>>>>>> of input >>>>>>>>>>>> >>>>> PCollection. >>>>>>>>>>>> >>>>> That, or maybe I'd need to use an external >>>>>>>>>>>> system like you >>>>>>>>>>>> >>>>> mentioned >>>>>>>>>>>> >>>>> to poll on the state of the pipeline (I'm >>>>>>>>>>>> using Google >>>>>>>>>>>> >>>>> Dataflow, so >>>>>>>>>>>> >>>>> maybe there's a way to do this with some API). >>>>>>>>>>>> >>>>> But I would have thought that there would be >>>>>>>>>>>> an easy way of >>>>>>>>>>>> >>>>> simply >>>>>>>>>>>> >>>>> saying "do not process this transform until >>>>>>>>>>>> this other >>>>>>>>>>>> >>>>> transform >>>>>>>>>>>> >>>>> completes". >>>>>>>>>>>> >>>>> Is there no established way of "signaling" >>>>>>>>>>>> between >>>>>>>>>>>> >>>>> pipelines when >>>>>>>>>>>> >>>>> some pipeline completes, or have some way of >>>>>>>>>>>> declaring a >>>>>>>>>>>> >>>>> dependency >>>>>>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Thanks again, >>>>>>>>>>>> >>>>> Philip >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, >>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Hi Philip, >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> You won't be able to do (3) in the same >>>>>>>>>>>> pipeline as >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>> Elasticsearch Sink >>>>>>>>>>>> >>>>> PTransform ends the pipeline with PDone. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> So, (3) has to be done in another >>>>>>>>>>>> pipeline (using a >>>>>>>>>>>> >>>>> DoFn) or in >>>>>>>>>>>> >>>>> another >>>>>>>>>>>> >>>>> "system" (like Camel for instance). I >>>>>>>>>>>> would do a check >>>>>>>>>>>> >>>>> of the >>>>>>>>>>>> >>>>> data in the >>>>>>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Regards >>>>>>>>>>>> >>>>> JB >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Hi, >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> I'm pretty new to Beam, and I've >>>>>>>>>>>> been trying to >>>>>>>>>>>> >>>>> use the >>>>>>>>>>>> >>>>> ElasticSearchIO >>>>>>>>>>>> >>>>> sink to write docs into ES. >>>>>>>>>>>> >>>>> With this, I want to be able to >>>>>>>>>>>> >>>>> 1. ingest and transform rows from DB >>>>>>>>>>>> (done) >>>>>>>>>>>> >>>>> 2. write JSON docs/strings into a >>>>>>>>>>>> new ES index >>>>>>>>>>>> >>>>> (done) >>>>>>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>>>>>> documents are >>>>>>>>>>>> >>>>> written into >>>>>>>>>>>> >>>>> a new index, >>>>>>>>>>>> >>>>> trigger an atomic index swap under >>>>>>>>>>>> an alias to >>>>>>>>>>>> >>>>> replace the >>>>>>>>>>>> >>>>> current >>>>>>>>>>>> >>>>> aliased index with the new index >>>>>>>>>>>> generated in step >>>>>>>>>>>> >>>>> 2. This >>>>>>>>>>>> >>>>> is basically >>>>>>>>>>>> >>>>> a single POST request to the ES >>>>>>>>>>>> cluster. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> The problem I'm facing is that I >>>>>>>>>>>> don't seem to be >>>>>>>>>>>> >>>>> able to >>>>>>>>>>>> >>>>> find a way to >>>>>>>>>>>> >>>>> have a way for (3) to happen after >>>>>>>>>>>> step (2) is >>>>>>>>>>>> >>>>> complete. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> The ElasticSearchIO.Write transform >>>>>>>>>>>> returns a >>>>>>>>>>>> >>>>> PDone, and >>>>>>>>>>>> >>>>> I'm not sure >>>>>>>>>>>> >>>>> how to proceed from there because it >>>>>>>>>>>> doesn't seem >>>>>>>>>>>> >>>>> to let me >>>>>>>>>>>> >>>>> do another >>>>>>>>>>>> >>>>> apply on it to "define" a dependency. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>> Write.html>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Is there a recommended way to >>>>>>>>>>>> construct pipelines >>>>>>>>>>>> >>>>> workflows >>>>>>>>>>>> >>>>> like this? >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Thanks in advance, >>>>>>>>>>>> >>>>> Philip >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>>> >>>>> jbono...@apache.org <mailto: >>>>>>>>>>>> jbono...@apache.org> >>>>>>>>>>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>>>>>>>>>> jbono...@apache.org>> >>>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>>>>>> >>>>> ---------------------------------------------------- >>>>>>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> -- >>>>>>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>>>>>> >>>> jbono...@apache.org >>>>>>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >> >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>> >>>>>> >>