Yeah. And I don't think there's a good way to define what sequencing even means, if the sink is returning results in windows that aren't gonna have a final pane.
On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote: > This is only for "final pane" waiting, correct? So someone who writes a > sink in the global window probably would not want to use this. > > On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > >> PR is out https://github.com/apache/beam/pull/4301 >> >> This should allow us to have useful sequencing for sinks like BigtableIO >> / BigQueryIO. >> >> Adding a couple of interested parties: >> - Steve, would you be interested in using this in >> https://github.com/apache/beam/pull/3997 ? >> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 - >> in particular, this works properly in case the input can be firing multiple >> times. >> >> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> I figured out the Never.ever() approach and it seems to work. Will >>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems like >>> this will be quite a useful transform. >>> >>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> I'm a bit confused by all of these suggestions: they sound plausible at >>>> a high level, but I'm having a hard time making any one of them concrete. >>>> >>>> So suppose we want to create a transform Wait.on(PCollection<?> >>>> signal): PCollection<T> -> PCollection<T>. >>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to >>>> "a", but buffers panes of "a" in any given window until the final pane of >>>> "sig" in the same window is fired (or, if it's never fired, until the >>>> window closes? could use a deadletter for that maybe). >>>> >>>> This transform I suppose would need to have a keyed and unkeyed version. >>>> >>>> The keyed version would support merging window fns, and would require >>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk - >>>> followed by a stateful ParDo? Or is there a way to get away without a >>>> stateful ParDo here? (not all runners support it) >>>> >>>> The unkeyed version would not support merging window fns. Reuven, can >>>> you elaborate how your combiner idea would work here - in particular, what >>>> do you mean by "triggering only on the final pane"? Do you mean filter >>>> non-final panes before entering the combiner? I wonder if that'll work, >>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on >>>> the side input with a Never.ever() trigger"? >>>> >>>> Thanks. >>>> >>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> This is an interesting point. >>>>> >>>>> In the past, we've often just though about sequencing some action to >>>>> take place after the sink, in which case you can simply use the sink >>>>> output >>>>> as a main input. However if you want to run a transform with another >>>>> PCollection as a main input, this doesn't work. And as you've discovered, >>>>> triggered side inputs are defined to be non-deterministic, and there's no >>>>> way to make things line up. >>>>> >>>>> What you're describing only makes sense if you're blocking against the >>>>> final pane (since otherwise there's no reasonable way to match up somePC >>>>> panes with the sink panes). There are multiple ways you can do this: one >>>>> would be to CGBK the two PCollections together, and trigger the new >>>>> transform only on the final pane. Another would be to add a combiner that >>>>> returns a Void, triggering only on the final pane, and then make this >>>>> singleton Void a side input. You could also do something explicit with the >>>>> state API. >>>>> >>>>> Reuven >>>>> >>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov < >>>>> kirpic...@google.com> wrote: >>>>> >>>>>> So this appears not as easy as anticipated (surprise!) >>>>>> >>>>>> Suppose we have a PCollection "donePanes" with an element per >>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of >>>>>> data has been written; this pane is: final / non-final". >>>>>> >>>>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn)) >>>>>> happens only after the final pane has been written. >>>>>> >>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to >>>>>> happen when c emits a *final* pane. >>>>>> >>>>>> Unfortunately, using >>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't >>>>>> do >>>>>> the trick: the side input becomes ready the moment *the first *pane >>>>>> of data has been written. >>>>>> >>>>>> But neither does >>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(...filter only final >>>>>> panes...).apply(View.asSingleton())). It also becomes ready the moment >>>>>> *the >>>>>> first* pane has been written, you just get an exception if you >>>>>> access the side input before the *final* pane was written. >>>>>> >>>>>> I can't think of a pure-Beam solution to this: either "donePanes" >>>>>> will be used as a main input to something (and then everything else can >>>>>> only be a side input, which is not general enough), or it will be used >>>>>> as a >>>>>> side input (and then we can't achieve "trigger only after the final pane >>>>>> fires"). >>>>>> >>>>>> It seems that we need a way to control the side input pushback, and >>>>>> configure whether a view becomes ready when its first pane has fired or >>>>>> when its last pane has fired. I could see this be a property on the View >>>>>> transform itself. In terms of implementation - I tried to figure out how >>>>>> side input readiness is determined, in the direct runner and Dataflow >>>>>> runner, and I'm completely lost and would appreciate some help. >>>>>> >>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> This sounds great! >>>>>>> >>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> This would be absolutely great! It seems somewhat similar to the >>>>>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java >>>>>>>> ). >>>>>>>> >>>>>>>> I find it helpful to think about the different things that may come >>>>>>>> after a sink. For instance: >>>>>>>> >>>>>>>> 1. It might be helpful to have a collection of failing input >>>>>>>> elements. The type of failed elements is pretty straightforward -- >>>>>>>> just the >>>>>>>> input elements. This allows handling such failures by directing them >>>>>>>> elsewhere or performing additional processing. >>>>>>>> >>>>>>> >>>>>>> BigQueryIO already does this as you point out. >>>>>>> >>>>>>>> >>>>>>>> 2. For a sink that produces a series of files, it might be useful >>>>>>>> to have a collection of the file names that have been completely >>>>>>>> written. >>>>>>>> This allows performing additional handling on these completed segments. >>>>>>>> >>>>>>> >>>>>>> In fact we already do this for FileBasedSinks. See >>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java >>>>>>> >>>>>>>> >>>>>>>> 3. For a sink that updates some destination, it would be reasonable >>>>>>>> to have a collection that provides (periodically) output indicating how >>>>>>>> complete the information written to that destination is. For instance, >>>>>>>> this >>>>>>>> might be something like "<this bigquery table> has all of the elements >>>>>>>> up >>>>>>>> to <input watermark>" complete. This allows tracking how much >>>>>>>> information >>>>>>>> has been completely written out. >>>>>>>> >>>>>>> >>>>>>> Interesting. Maybe tough to do since sinks often don't have that >>>>>>> knowledge. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> I think those concepts map to the more detailed description Eugene >>>>>>>> provided, but I find it helpful to focus on what information comes out >>>>>>>> of >>>>>>>> the sink and how it might be used. >>>>>>>> >>>>>>>> Were there any use cases the above miss? Any functionality that has >>>>>>>> been described that doesn't map to these use cases? >>>>>>>> >>>>>>>> -- Ben >>>>>>>> >>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> It makes sense to consider how this maps onto existing kinds of >>>>>>>>> sinks. >>>>>>>>> >>>>>>>>> E.g.: >>>>>>>>> - Something that just makes an RPC per record, e.g. >>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus >>>>>>>>> value or >>>>>>>>> number of records written) that will be Combine'd into 1 result per >>>>>>>>> pane of >>>>>>>>> input. A user can sequence against this and be notified when some >>>>>>>>> intermediate amount of data has been written for a window, or (via >>>>>>>>> .isFinal()) when all of it has been written. >>>>>>>>> - Something that e.g. initiates an import job, such as >>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up >>>>>>>>> atomic >>>>>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>>>>> information about the job (e.g. its id and statistics). Role of panes >>>>>>>>> is >>>>>>>>> the same. >>>>>>>>> - Something like above but that supports dynamic destinations: >>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, >>>>>>>>> ResultT>> >>>>>>>>> where ResultT may be something like a list of files that were written >>>>>>>>> for >>>>>>>>> this pane of this destination. >>>>>>>>> >>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov < >>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>> >>>>>>>>>> I agree that the proper API for enabling the use case "do >>>>>>>>>> something after the data has been written" is to return a >>>>>>>>>> PCollection of >>>>>>>>>> objects where each object represents the result of writing some >>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this >>>>>>>>>> PCollection, in order to "do something after this subset has been >>>>>>>>>> written". >>>>>>>>>> >>>>>>>>>> The challenging part here is *identifying* the subset of the data >>>>>>>>>> that's been written, in a way consistent with Beam's unified >>>>>>>>>> batch/streaming model, where saying "all data has been written" is >>>>>>>>>> not an >>>>>>>>>> option because more data can arrive. >>>>>>>>>> >>>>>>>>>> The next choice is "a window of input has been written", but then >>>>>>>>>> again, late data can arrive into a window as well. >>>>>>>>>> >>>>>>>>>> Next choice after that is "a pane of input has been written", but >>>>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of >>>>>>>>>> input" is moot: triggering and panes should be something private to >>>>>>>>>> the >>>>>>>>>> sink, and the same input can trigger different sinks differently. The >>>>>>>>>> hypothetical different accumulation modes make this trickier still. >>>>>>>>>> I'm not >>>>>>>>>> sure whether we intend to also challenge the idea that windowing is >>>>>>>>>> inherent to the collection, or whether it too should be specified on >>>>>>>>>> a >>>>>>>>>> transform that processes the collection. I think for the sake of this >>>>>>>>>> discussion we can assume that it's inherent, and assume the mental >>>>>>>>>> model >>>>>>>>>> that the elements in different windows of a PCollection are processed >>>>>>>>>> independently - "as if" there were multiple pipelines processing each >>>>>>>>>> window. >>>>>>>>>> >>>>>>>>>> Overall, embracing the full picture, we end up with something >>>>>>>>>> like this: >>>>>>>>>> - The input PCollection is a composition of windows. >>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding >>>>>>>>>> windows), the below applies to the entire contents of the >>>>>>>>>> PCollection. If >>>>>>>>>> it's merging (e.g. session windows), then it applies per-key, and >>>>>>>>>> the input >>>>>>>>>> should be (perhaps implicitly) keyed in a way that the sink >>>>>>>>>> understands - >>>>>>>>>> for example, the grouping by destination in DynamicDestinations in >>>>>>>>>> file and >>>>>>>>>> bigquery writes. >>>>>>>>>> - Each window's contents is a "changelog" - stream of elements >>>>>>>>>> and retractions. >>>>>>>>>> - A "sink" processes each window of the collection, deciding how >>>>>>>>>> to handle elements and retractions (and whether to support >>>>>>>>>> retractions at >>>>>>>>>> all) in a sink-specific way, and deciding *when* to perform the side >>>>>>>>>> effects for a portion of the changelog (a "pane") based on the sink's >>>>>>>>>> triggering strategy. >>>>>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list >>>>>>>>>> of filenames that have been written, or simply a number of records >>>>>>>>>> that was >>>>>>>>>> written, or a bogus void value etc. The result will implicitly >>>>>>>>>> include the >>>>>>>>>> window of the input it's associated with. It will also implicitly >>>>>>>>>> include >>>>>>>>>> pane information - index of the pane in this window, and whether >>>>>>>>>> this is >>>>>>>>>> the first or last pane. >>>>>>>>>> - The partitioning into bundles is an implementation detail and >>>>>>>>>> not very useful, so before presenting the pane write results to the >>>>>>>>>> user, >>>>>>>>>> the sink will probably want to Combine the bundle results so that >>>>>>>>>> there >>>>>>>>>> ends up being 1 value for each pane that was written. Once again >>>>>>>>>> note that >>>>>>>>>> panes may be associated with windows of the input as a whole, but if >>>>>>>>>> the >>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be associated >>>>>>>>>> with >>>>>>>>>> per-key subsets of windows of the input. >>>>>>>>>> - This combining requires an extra, well, combining operation, so >>>>>>>>>> it should be optional. >>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or a >>>>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, >>>>>>>>>> where >>>>>>>>>> the elements of this collection will implicitly have window and pane >>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo. >>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace >>>>>>>>>> the fact that trigger strategy is set on the input. But in that case >>>>>>>>>> the >>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not >>>>>>>>>> necessarily >>>>>>>>>> directly related to panes of the input - the sink is allowed to do >>>>>>>>>> internal >>>>>>>>>> aggregation as an implementation detail, which may modify the >>>>>>>>>> triggering >>>>>>>>>> strategy. Basically the user will still get sink-assigned panes. >>>>>>>>>> - In most cases, one may imagine that the user is interested in >>>>>>>>>> being notified of "no more data associated with this window will be >>>>>>>>>> written", so the user will ignore all ResultT's except those where >>>>>>>>>> the pane >>>>>>>>>> is marked final. If a user is interested in being notified of >>>>>>>>>> intermediate >>>>>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>>>>> identify >>>>>>>>>> the precise subset of input associated with the intermediate result. >>>>>>>>>> >>>>>>>>>> I think the really key points of the above are: >>>>>>>>>> - Sinks should support windowed input. Sinks should write >>>>>>>>>> different windows of input independently. If the sink can write >>>>>>>>>> multi-destination input, the destination should function as a >>>>>>>>>> grouping key, >>>>>>>>>> and in that case merging windowing should be allowed. >>>>>>>>>> - Producing a PCollection of write results should be optional. >>>>>>>>>> - When asked to produce results, sinks produce a PCollection of >>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in >>>>>>>>>> the >>>>>>>>>> window of the input that was written, and have a PaneInfo assigned >>>>>>>>>> by the >>>>>>>>>> sink, of which probably the only part useful to the user is whether >>>>>>>>>> it's >>>>>>>>>> .isFinal(). >>>>>>>>>> >>>>>>>>>> Does this sound reasonable? >>>>>>>>>> >>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw < >>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> +1 >>>>>>>>>>> >>>>>>>>>>> At the very least an empty PCollection<?> could be produced with >>>>>>>>>>> no >>>>>>>>>>> promises about its contents but the ability to be followed (e.g. >>>>>>>>>>> as a >>>>>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>>>>> metadata >>>>>>>>>>> one may decide to produce in the future. >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> > +dev@ >>>>>>>>>>> > >>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are >>>>>>>>>>> easy to >>>>>>>>>>> > understand and a good way for an IO to communicate and >>>>>>>>>>> establish causal >>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may >>>>>>>>>>> spur further >>>>>>>>>>> > useful thoughts based on the design decisions about what sort >>>>>>>>>>> of output is >>>>>>>>>>> > most useful. >>>>>>>>>>> > >>>>>>>>>>> > Kenn >>>>>>>>>>> > >>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>>>>> output which >>>>>>>>>>> >> can be used after a write (file names, transaction/commit/row >>>>>>>>>>> ids, table >>>>>>>>>>> >> names, ...). In addition to this metadata, having a >>>>>>>>>>> PCollection of all >>>>>>>>>>> >> successful writes and all failed writes is useful for users >>>>>>>>>>> so they can >>>>>>>>>>> >> chain an action which depends on what was or wasn't >>>>>>>>>>> successfully written. >>>>>>>>>>> >> Users have requested adding retry/failure handling policies >>>>>>>>>>> to sinks so that >>>>>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>>>>> >> >>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>>>>> chet.aldr...@postmates.com> >>>>>>>>>>> >> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> So I agree generally with the idea that returning a >>>>>>>>>>> PCollection makes all >>>>>>>>>>> >>> of this easier so that arbitrary additional functions can be >>>>>>>>>>> added, what >>>>>>>>>>> >>> exactly would write functions be returning in a PCollection >>>>>>>>>>> that would make >>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external >>>>>>>>>>> source and now >>>>>>>>>>> >>> the collection itself is no longer needed. >>>>>>>>>>> >>> >>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently >>>>>>>>>>> that doesn’t >>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible >>>>>>>>>>> ways of handling >>>>>>>>>>> >>> this given this conversation, and am curious which solution >>>>>>>>>>> sounds like the >>>>>>>>>>> >>> best way to deal with the problem: >>>>>>>>>>> >>> >>>>>>>>>>> >>> 1. Have output transforms always return something specific >>>>>>>>>>> (which would >>>>>>>>>>> >>> be the same across transforms by convention), that is in the >>>>>>>>>>> form of a >>>>>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>>>>> >>> >>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>>>>> PCollection so we >>>>>>>>>>> >>> can run applies afterward. >>>>>>>>>>> >>> >>>>>>>>>>> >>> 3. Make output transforms provide the facility for a >>>>>>>>>>> callback function >>>>>>>>>>> >>> which runs after the transform is complete. >>>>>>>>>>> >>> >>>>>>>>>>> >>> I went through these gymnastics recently when I was trying >>>>>>>>>>> to build >>>>>>>>>>> >>> something that would move indices after writing to Algolia, >>>>>>>>>>> and the solution >>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to >>>>>>>>>>> exist in Beam. The >>>>>>>>>>> >>> problem is that particular method requires the output >>>>>>>>>>> transform in question >>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t >>>>>>>>>>> make sense to >>>>>>>>>>> >>> return one. This seems like a bad solution, but >>>>>>>>>>> unfortunately there isn’t a >>>>>>>>>>> >>> notion of a transform that has no explicit output that needs >>>>>>>>>>> to have >>>>>>>>>>> >>> operations occur after it. >>>>>>>>>>> >>> >>>>>>>>>>> >>> The three potential solutions above address this issue, but >>>>>>>>>>> I would like >>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a different >>>>>>>>>>> proposal >>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on >>>>>>>>>>> this, since it >>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>>>>> really useful, for >>>>>>>>>>> >>> one. >>>>>>>>>>> >>> >>>>>>>>>>> >>> Chet >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection >>>>>>>>>>> is returned >>>>>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>>>>> additional functions >>>>>>>>>>> >>> can be applied. >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>>>>> j...@nanthrax.net> >>>>>>>>>>> >>> wrote: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more >>>>>>>>>>> than in the >>>>>>>>>>> >>>> main. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Regards >>>>>>>>>>> >>>> JB >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> I do something almost exactly like this, but with >>>>>>>>>>> BigtableIO instead. >>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I >>>>>>>>>>> need to finish this >>>>>>>>>>> >>>>> up...). It would really be nice for most IOs to support >>>>>>>>>>> something like >>>>>>>>>>> >>>>> this. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the >>>>>>>>>>> output from >>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function >>>>>>>>>>> which will run when >>>>>>>>>>> >>>>> all writes finish. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> You probably want to avoid doing something in the main >>>>>>>>>>> method because >>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver >>>>>>>>>>> will die, get >>>>>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>>>>> nerdyn...@gmail.com >>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Assuming you're in Java. You could just follow on in >>>>>>>>>>> your Main >>>>>>>>>>> >>>>> method. >>>>>>>>>>> >>>>> Checking the state of the Result. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Example: >>>>>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>>>>> >>>>> try { >>>>>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>>>>> >>>>> //DO ES work >>>>>>>>>>> >>>>> } >>>>>>>>>>> >>>>> } catch(Exception e) { >>>>>>>>>>> >>>>> result.cancel(); >>>>>>>>>>> >>>>> throw e; >>>>>>>>>>> >>>>> } >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Otherwise you could also use Oozie to construct a work >>>>>>>>>>> flow. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>>>>> >>>>> <j...@nanthrax.net >>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Hi, >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> We can imagine to have a user callback fn fired >>>>>>>>>>> when the sink >>>>>>>>>>> >>>>> batch is >>>>>>>>>>> >>>>> complete. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Let me think about that. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Regards >>>>>>>>>>> >>>>> JB >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Hey JB, >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>>>>> >>>>> I suppose in that case I would need a way of >>>>>>>>>>> monitoring >>>>>>>>>>> >>>>> when the ES >>>>>>>>>>> >>>>> transform completes successfully before I can >>>>>>>>>>> proceed with >>>>>>>>>>> >>>>> doing the >>>>>>>>>>> >>>>> swap. >>>>>>>>>>> >>>>> The problem with this is that I can't think of >>>>>>>>>>> a good way >>>>>>>>>>> >>>>> to >>>>>>>>>>> >>>>> determine that termination state short of >>>>>>>>>>> polling the new >>>>>>>>>>> >>>>> index to >>>>>>>>>>> >>>>> check the document count compared to the size >>>>>>>>>>> of input >>>>>>>>>>> >>>>> PCollection. >>>>>>>>>>> >>>>> That, or maybe I'd need to use an external >>>>>>>>>>> system like you >>>>>>>>>>> >>>>> mentioned >>>>>>>>>>> >>>>> to poll on the state of the pipeline (I'm >>>>>>>>>>> using Google >>>>>>>>>>> >>>>> Dataflow, so >>>>>>>>>>> >>>>> maybe there's a way to do this with some API). >>>>>>>>>>> >>>>> But I would have thought that there would be >>>>>>>>>>> an easy way of >>>>>>>>>>> >>>>> simply >>>>>>>>>>> >>>>> saying "do not process this transform until >>>>>>>>>>> this other >>>>>>>>>>> >>>>> transform >>>>>>>>>>> >>>>> completes". >>>>>>>>>>> >>>>> Is there no established way of "signaling" >>>>>>>>>>> between >>>>>>>>>>> >>>>> pipelines when >>>>>>>>>>> >>>>> some pipeline completes, or have some way of >>>>>>>>>>> declaring a >>>>>>>>>>> >>>>> dependency >>>>>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Thanks again, >>>>>>>>>>> >>>>> Philip >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, >>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Hi Philip, >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> You won't be able to do (3) in the same >>>>>>>>>>> pipeline as >>>>>>>>>>> >>>>> the >>>>>>>>>>> >>>>> Elasticsearch Sink >>>>>>>>>>> >>>>> PTransform ends the pipeline with PDone. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> So, (3) has to be done in another >>>>>>>>>>> pipeline (using a >>>>>>>>>>> >>>>> DoFn) or in >>>>>>>>>>> >>>>> another >>>>>>>>>>> >>>>> "system" (like Camel for instance). I >>>>>>>>>>> would do a check >>>>>>>>>>> >>>>> of the >>>>>>>>>>> >>>>> data in the >>>>>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Regards >>>>>>>>>>> >>>>> JB >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Hi, >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> I'm pretty new to Beam, and I've been >>>>>>>>>>> trying to >>>>>>>>>>> >>>>> use the >>>>>>>>>>> >>>>> ElasticSearchIO >>>>>>>>>>> >>>>> sink to write docs into ES. >>>>>>>>>>> >>>>> With this, I want to be able to >>>>>>>>>>> >>>>> 1. ingest and transform rows from DB >>>>>>>>>>> (done) >>>>>>>>>>> >>>>> 2. write JSON docs/strings into a new >>>>>>>>>>> ES index >>>>>>>>>>> >>>>> (done) >>>>>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>>>>> documents are >>>>>>>>>>> >>>>> written into >>>>>>>>>>> >>>>> a new index, >>>>>>>>>>> >>>>> trigger an atomic index swap under an >>>>>>>>>>> alias to >>>>>>>>>>> >>>>> replace the >>>>>>>>>>> >>>>> current >>>>>>>>>>> >>>>> aliased index with the new index >>>>>>>>>>> generated in step >>>>>>>>>>> >>>>> 2. This >>>>>>>>>>> >>>>> is basically >>>>>>>>>>> >>>>> a single POST request to the ES >>>>>>>>>>> cluster. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> The problem I'm facing is that I >>>>>>>>>>> don't seem to be >>>>>>>>>>> >>>>> able to >>>>>>>>>>> >>>>> find a way to >>>>>>>>>>> >>>>> have a way for (3) to happen after >>>>>>>>>>> step (2) is >>>>>>>>>>> >>>>> complete. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> The ElasticSearchIO.Write transform >>>>>>>>>>> returns a >>>>>>>>>>> >>>>> PDone, and >>>>>>>>>>> >>>>> I'm not sure >>>>>>>>>>> >>>>> how to proceed from there because it >>>>>>>>>>> doesn't seem >>>>>>>>>>> >>>>> to let me >>>>>>>>>>> >>>>> do another >>>>>>>>>>> >>>>> apply on it to "define" a dependency. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> > >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> >> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> > >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> < >>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>> >>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Is there a recommended way to >>>>>>>>>>> construct pipelines >>>>>>>>>>> >>>>> workflows >>>>>>>>>>> >>>>> like this? >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Thanks in advance, >>>>>>>>>>> >>>>> Philip >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>> >>>>> jbono...@apache.org <mailto: >>>>>>>>>>> jbono...@apache.org> >>>>>>>>>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>>>>>>>>> jbono...@apache.org>> >>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>>>>> >>>>> ---------------------------------------------------- >>>>>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> -- >>>>>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>>>>> >>>> jbono...@apache.org >>>>>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >> >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>> >>>>> >