This is only for "final pane" waiting, correct? So someone who writes a sink in the global window probably would not want to use this.
On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <[email protected]> wrote: > PR is out https://github.com/apache/beam/pull/4301 > > This should allow us to have useful sequencing for sinks like BigtableIO / > BigQueryIO. > > Adding a couple of interested parties: > - Steve, would you be interested in using this in > https://github.com/apache/beam/pull/3997 ? > - Mairbek: this should help in https://github.com/apache/beam/pull/4264 - > in particular, this works properly in case the input can be firing multiple > times. > > On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <[email protected]> > wrote: > >> I figured out the Never.ever() approach and it seems to work. Will finish >> this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this >> will be quite a useful transform. >> >> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <[email protected]> >> wrote: >> >>> I'm a bit confused by all of these suggestions: they sound plausible at >>> a high level, but I'm having a hard time making any one of them concrete. >>> >>> So suppose we want to create a transform Wait.on(PCollection<?> signal): >>> PCollection<T> -> PCollection<T>. >>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to >>> "a", but buffers panes of "a" in any given window until the final pane of >>> "sig" in the same window is fired (or, if it's never fired, until the >>> window closes? could use a deadletter for that maybe). >>> >>> This transform I suppose would need to have a keyed and unkeyed version. >>> >>> The keyed version would support merging window fns, and would require >>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk - >>> followed by a stateful ParDo? Or is there a way to get away without a >>> stateful ParDo here? (not all runners support it) >>> >>> The unkeyed version would not support merging window fns. Reuven, can >>> you elaborate how your combiner idea would work here - in particular, what >>> do you mean by "triggering only on the final pane"? Do you mean filter >>> non-final panes before entering the combiner? I wonder if that'll work, >>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on >>> the side input with a Never.ever() trigger"? >>> >>> Thanks. >>> >>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <[email protected]> wrote: >>> >>>> This is an interesting point. >>>> >>>> In the past, we've often just though about sequencing some action to >>>> take place after the sink, in which case you can simply use the sink output >>>> as a main input. However if you want to run a transform with another >>>> PCollection as a main input, this doesn't work. And as you've discovered, >>>> triggered side inputs are defined to be non-deterministic, and there's no >>>> way to make things line up. >>>> >>>> What you're describing only makes sense if you're blocking against the >>>> final pane (since otherwise there's no reasonable way to match up somePC >>>> panes with the sink panes). There are multiple ways you can do this: one >>>> would be to CGBK the two PCollections together, and trigger the new >>>> transform only on the final pane. Another would be to add a combiner that >>>> returns a Void, triggering only on the final pane, and then make this >>>> singleton Void a side input. You could also do something explicit with the >>>> state API. >>>> >>>> Reuven >>>> >>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <[email protected] >>>> > wrote: >>>> >>>>> So this appears not as easy as anticipated (surprise!) >>>>> >>>>> Suppose we have a PCollection "donePanes" with an element per >>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of >>>>> data has been written; this pane is: final / non-final". >>>>> >>>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn)) >>>>> happens only after the final pane has been written. >>>>> >>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to >>>>> happen when c emits a *final* pane. >>>>> >>>>> Unfortunately, using >>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) >>>>> doesn't do the trick: the side input becomes ready the moment *the >>>>> first *pane of data has been written. >>>>> >>>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter >>>>> only final panes...).apply(View.asSingleton())). It also becomes >>>>> ready the moment *the first* pane has been written, you just get an >>>>> exception if you access the side input before the *final* pane was >>>>> written. >>>>> >>>>> I can't think of a pure-Beam solution to this: either "donePanes" will >>>>> be used as a main input to something (and then everything else can only be >>>>> a side input, which is not general enough), or it will be used as a side >>>>> input (and then we can't achieve "trigger only after the final pane >>>>> fires"). >>>>> >>>>> It seems that we need a way to control the side input pushback, and >>>>> configure whether a view becomes ready when its first pane has fired or >>>>> when its last pane has fired. I could see this be a property on the View >>>>> transform itself. In terms of implementation - I tried to figure out how >>>>> side input readiness is determined, in the direct runner and Dataflow >>>>> runner, and I'm completely lost and would appreciate some help. >>>>> >>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> This sounds great! >>>>>> >>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> This would be absolutely great! It seems somewhat similar to the >>>>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/ >>>>>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >>>>>>> io/gcp/bigquery/WriteResult.java). >>>>>>> >>>>>>> I find it helpful to think about the different things that may come >>>>>>> after a sink. For instance: >>>>>>> >>>>>>> 1. It might be helpful to have a collection of failing input >>>>>>> elements. The type of failed elements is pretty straightforward -- just >>>>>>> the >>>>>>> input elements. This allows handling such failures by directing them >>>>>>> elsewhere or performing additional processing. >>>>>>> >>>>>> >>>>>> BigQueryIO already does this as you point out. >>>>>> >>>>>>> >>>>>>> 2. For a sink that produces a series of files, it might be useful to >>>>>>> have a collection of the file names that have been completely written. >>>>>>> This >>>>>>> allows performing additional handling on these completed segments. >>>>>>> >>>>>> >>>>>> In fact we already do this for FileBasedSinks. See >>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99 >>>>>> 956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/ >>>>>> io/WriteFilesResult.java >>>>>> >>>>>>> >>>>>>> 3. For a sink that updates some destination, it would be reasonable >>>>>>> to have a collection that provides (periodically) output indicating how >>>>>>> complete the information written to that destination is. For instance, >>>>>>> this >>>>>>> might be something like "<this bigquery table> has all of the elements >>>>>>> up >>>>>>> to <input watermark>" complete. This allows tracking how much >>>>>>> information >>>>>>> has been completely written out. >>>>>>> >>>>>> >>>>>> Interesting. Maybe tough to do since sinks often don't have that >>>>>> knowledge. >>>>>> >>>>>> >>>>>>> >>>>>>> I think those concepts map to the more detailed description Eugene >>>>>>> provided, but I find it helpful to focus on what information comes out >>>>>>> of >>>>>>> the sink and how it might be used. >>>>>>> >>>>>>> Were there any use cases the above miss? Any functionality that has >>>>>>> been described that doesn't map to these use cases? >>>>>>> >>>>>>> -- Ben >>>>>>> >>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> It makes sense to consider how this maps onto existing kinds of >>>>>>>> sinks. >>>>>>>> >>>>>>>> E.g.: >>>>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write(): >>>>>>>> that will emit 1 result per bundle (either a bogus value or number of >>>>>>>> records written) that will be Combine'd into 1 result per pane of >>>>>>>> input. A >>>>>>>> user can sequence against this and be notified when some intermediate >>>>>>>> amount of data has been written for a window, or (via .isFinal()) when >>>>>>>> all >>>>>>>> of it has been written. >>>>>>>> - Something that e.g. initiates an import job, such as >>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic >>>>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>>>> information about the job (e.g. its id and statistics). Role of panes >>>>>>>> is >>>>>>>> the same. >>>>>>>> - Something like above but that supports dynamic destinations: like >>>>>>>> in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> >>>>>>>> where >>>>>>>> ResultT may be something like a list of files that were written for >>>>>>>> this >>>>>>>> pane of this destination. >>>>>>>> >>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> I agree that the proper API for enabling the use case "do >>>>>>>>> something after the data has been written" is to return a PCollection >>>>>>>>> of >>>>>>>>> objects where each object represents the result of writing some >>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this >>>>>>>>> PCollection, in order to "do something after this subset has been >>>>>>>>> written". >>>>>>>>> >>>>>>>>> The challenging part here is *identifying* the subset of the data >>>>>>>>> that's been written, in a way consistent with Beam's unified >>>>>>>>> batch/streaming model, where saying "all data has been written" is >>>>>>>>> not an >>>>>>>>> option because more data can arrive. >>>>>>>>> >>>>>>>>> The next choice is "a window of input has been written", but then >>>>>>>>> again, late data can arrive into a window as well. >>>>>>>>> >>>>>>>>> Next choice after that is "a pane of input has been written", but >>>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of >>>>>>>>> input" is moot: triggering and panes should be something private to >>>>>>>>> the >>>>>>>>> sink, and the same input can trigger different sinks differently. The >>>>>>>>> hypothetical different accumulation modes make this trickier still. >>>>>>>>> I'm not >>>>>>>>> sure whether we intend to also challenge the idea that windowing is >>>>>>>>> inherent to the collection, or whether it too should be specified on a >>>>>>>>> transform that processes the collection. I think for the sake of this >>>>>>>>> discussion we can assume that it's inherent, and assume the mental >>>>>>>>> model >>>>>>>>> that the elements in different windows of a PCollection are processed >>>>>>>>> independently - "as if" there were multiple pipelines processing each >>>>>>>>> window. >>>>>>>>> >>>>>>>>> Overall, embracing the full picture, we end up with something like >>>>>>>>> this: >>>>>>>>> - The input PCollection is a composition of windows. >>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding >>>>>>>>> windows), the below applies to the entire contents of the >>>>>>>>> PCollection. If >>>>>>>>> it's merging (e.g. session windows), then it applies per-key, and the >>>>>>>>> input >>>>>>>>> should be (perhaps implicitly) keyed in a way that the sink >>>>>>>>> understands - >>>>>>>>> for example, the grouping by destination in DynamicDestinations in >>>>>>>>> file and >>>>>>>>> bigquery writes. >>>>>>>>> - Each window's contents is a "changelog" - stream of elements and >>>>>>>>> retractions. >>>>>>>>> - A "sink" processes each window of the collection, deciding how >>>>>>>>> to handle elements and retractions (and whether to support >>>>>>>>> retractions at >>>>>>>>> all) in a sink-specific way, and deciding *when* to perform the side >>>>>>>>> effects for a portion of the changelog (a "pane") based on the sink's >>>>>>>>> triggering strategy. >>>>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list >>>>>>>>> of filenames that have been written, or simply a number of records >>>>>>>>> that was >>>>>>>>> written, or a bogus void value etc. The result will implicitly >>>>>>>>> include the >>>>>>>>> window of the input it's associated with. It will also implicitly >>>>>>>>> include >>>>>>>>> pane information - index of the pane in this window, and whether this >>>>>>>>> is >>>>>>>>> the first or last pane. >>>>>>>>> - The partitioning into bundles is an implementation detail and >>>>>>>>> not very useful, so before presenting the pane write results to the >>>>>>>>> user, >>>>>>>>> the sink will probably want to Combine the bundle results so that >>>>>>>>> there >>>>>>>>> ends up being 1 value for each pane that was written. Once again note >>>>>>>>> that >>>>>>>>> panes may be associated with windows of the input as a whole, but if >>>>>>>>> the >>>>>>>>> input is keyed (like with DynamicDestinations) they'll be associated >>>>>>>>> with >>>>>>>>> per-key subsets of windows of the input. >>>>>>>>> - This combining requires an extra, well, combining operation, so >>>>>>>>> it should be optional. >>>>>>>>> - The user will end up getting either a PCollection<ResultT> or a >>>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, >>>>>>>>> where >>>>>>>>> the elements of this collection will implicitly have window and pane >>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo. >>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace >>>>>>>>> the fact that trigger strategy is set on the input. But in that case >>>>>>>>> the >>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not >>>>>>>>> necessarily >>>>>>>>> directly related to panes of the input - the sink is allowed to do >>>>>>>>> internal >>>>>>>>> aggregation as an implementation detail, which may modify the >>>>>>>>> triggering >>>>>>>>> strategy. Basically the user will still get sink-assigned panes. >>>>>>>>> - In most cases, one may imagine that the user is interested in >>>>>>>>> being notified of "no more data associated with this window will be >>>>>>>>> written", so the user will ignore all ResultT's except those where >>>>>>>>> the pane >>>>>>>>> is marked final. If a user is interested in being notified of >>>>>>>>> intermediate >>>>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>>>> identify >>>>>>>>> the precise subset of input associated with the intermediate result. >>>>>>>>> >>>>>>>>> I think the really key points of the above are: >>>>>>>>> - Sinks should support windowed input. Sinks should write >>>>>>>>> different windows of input independently. If the sink can write >>>>>>>>> multi-destination input, the destination should function as a >>>>>>>>> grouping key, >>>>>>>>> and in that case merging windowing should be allowed. >>>>>>>>> - Producing a PCollection of write results should be optional. >>>>>>>>> - When asked to produce results, sinks produce a PCollection of >>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in >>>>>>>>> the >>>>>>>>> window of the input that was written, and have a PaneInfo assigned by >>>>>>>>> the >>>>>>>>> sink, of which probably the only part useful to the user is whether >>>>>>>>> it's >>>>>>>>> .isFinal(). >>>>>>>>> >>>>>>>>> Does this sound reasonable? >>>>>>>>> >>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> +1 >>>>>>>>>> >>>>>>>>>> At the very least an empty PCollection<?> could be produced with >>>>>>>>>> no >>>>>>>>>> promises about its contents but the ability to be followed (e.g. >>>>>>>>>> as a >>>>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>>>> metadata >>>>>>>>>> one may decide to produce in the future. >>>>>>>>>> >>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> > +dev@ >>>>>>>>>> > >>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are >>>>>>>>>> easy to >>>>>>>>>> > understand and a good way for an IO to communicate and >>>>>>>>>> establish causal >>>>>>>>>> > dependencies. Converting an IO from PDone to real output may >>>>>>>>>> spur further >>>>>>>>>> > useful thoughts based on the design decisions about what sort >>>>>>>>>> of output is >>>>>>>>>> > most useful. >>>>>>>>>> > >>>>>>>>>> > Kenn >>>>>>>>>> > >>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >> >>>>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>>>> output which >>>>>>>>>> >> can be used after a write (file names, transaction/commit/row >>>>>>>>>> ids, table >>>>>>>>>> >> names, ...). In addition to this metadata, having a >>>>>>>>>> PCollection of all >>>>>>>>>> >> successful writes and all failed writes is useful for users so >>>>>>>>>> they can >>>>>>>>>> >> chain an action which depends on what was or wasn't >>>>>>>>>> successfully written. >>>>>>>>>> >> Users have requested adding retry/failure handling policies to >>>>>>>>>> sinks so that >>>>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>>>> >> >>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>>>> [email protected]> >>>>>>>>>> >> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>> So I agree generally with the idea that returning a >>>>>>>>>> PCollection makes all >>>>>>>>>> >>> of this easier so that arbitrary additional functions can be >>>>>>>>>> added, what >>>>>>>>>> >>> exactly would write functions be returning in a PCollection >>>>>>>>>> that would make >>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external >>>>>>>>>> source and now >>>>>>>>>> >>> the collection itself is no longer needed. >>>>>>>>>> >>> >>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently >>>>>>>>>> that doesn’t >>>>>>>>>> >>> allow any work to occur after it. I see a couple possible >>>>>>>>>> ways of handling >>>>>>>>>> >>> this given this conversation, and am curious which solution >>>>>>>>>> sounds like the >>>>>>>>>> >>> best way to deal with the problem: >>>>>>>>>> >>> >>>>>>>>>> >>> 1. Have output transforms always return something specific >>>>>>>>>> (which would >>>>>>>>>> >>> be the same across transforms by convention), that is in the >>>>>>>>>> form of a >>>>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>>>> >>> >>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>>>> PCollection so we >>>>>>>>>> >>> can run applies afterward. >>>>>>>>>> >>> >>>>>>>>>> >>> 3. Make output transforms provide the facility for a callback >>>>>>>>>> function >>>>>>>>>> >>> which runs after the transform is complete. >>>>>>>>>> >>> >>>>>>>>>> >>> I went through these gymnastics recently when I was trying to >>>>>>>>>> build >>>>>>>>>> >>> something that would move indices after writing to Algolia, >>>>>>>>>> and the solution >>>>>>>>>> >>> was to co-opt code from the old Sink class that used to exist >>>>>>>>>> in Beam. The >>>>>>>>>> >>> problem is that particular method requires the output >>>>>>>>>> transform in question >>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t >>>>>>>>>> make sense to >>>>>>>>>> >>> return one. This seems like a bad solution, but unfortunately >>>>>>>>>> there isn’t a >>>>>>>>>> >>> notion of a transform that has no explicit output that needs >>>>>>>>>> to have >>>>>>>>>> >>> operations occur after it. >>>>>>>>>> >>> >>>>>>>>>> >>> The three potential solutions above address this issue, but I >>>>>>>>>> would like >>>>>>>>>> >>> to hear on which would be preferable (or perhaps a different >>>>>>>>>> proposal >>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on >>>>>>>>>> this, since it >>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>>>> really useful, for >>>>>>>>>> >>> one. >>>>>>>>>> >>> >>>>>>>>>> >>> Chet >>>>>>>>>> >>> >>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is >>>>>>>>>> returned >>>>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>>>> additional functions >>>>>>>>>> >>> can be applied. >>>>>>>>>> >>> >>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>>>> [email protected]> >>>>>>>>>> >>> wrote: >>>>>>>>>> >>>> >>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than >>>>>>>>>> in the >>>>>>>>>> >>>> main. >>>>>>>>>> >>>> >>>>>>>>>> >>>> Regards >>>>>>>>>> >>>> JB >>>>>>>>>> >>>> >>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> I do something almost exactly like this, but with >>>>>>>>>> BigtableIO instead. >>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I >>>>>>>>>> need to finish this >>>>>>>>>> >>>>> up...). It would really be nice for most IOs to support >>>>>>>>>> something like >>>>>>>>>> >>>>> this. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the >>>>>>>>>> output from >>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function which >>>>>>>>>> will run when >>>>>>>>>> >>>>> all writes finish. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> You probably want to avoid doing something in the main >>>>>>>>>> method because >>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver >>>>>>>>>> will die, get >>>>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>>>> [email protected] >>>>>>>>>> >>>>> <mailto:[email protected]>> wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Assuming you're in Java. You could just follow on in >>>>>>>>>> your Main >>>>>>>>>> >>>>> method. >>>>>>>>>> >>>>> Checking the state of the Result. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Example: >>>>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>>>> >>>>> try { >>>>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>>>> >>>>> //DO ES work >>>>>>>>>> >>>>> } >>>>>>>>>> >>>>> } catch(Exception e) { >>>>>>>>>> >>>>> result.cancel(); >>>>>>>>>> >>>>> throw e; >>>>>>>>>> >>>>> } >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Otherwise you could also use Oozie to construct a work >>>>>>>>>> flow. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>>>> >>>>> <[email protected] >>>>>>>>>> >>>>> <mailto:[email protected]>> wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Hi, >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> We can imagine to have a user callback fn fired >>>>>>>>>> when the sink >>>>>>>>>> >>>>> batch is >>>>>>>>>> >>>>> complete. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Let me think about that. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Regards >>>>>>>>>> >>>>> JB >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Hey JB, >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>>>> >>>>> I suppose in that case I would need a way of >>>>>>>>>> monitoring >>>>>>>>>> >>>>> when the ES >>>>>>>>>> >>>>> transform completes successfully before I can >>>>>>>>>> proceed with >>>>>>>>>> >>>>> doing the >>>>>>>>>> >>>>> swap. >>>>>>>>>> >>>>> The problem with this is that I can't think of >>>>>>>>>> a good way >>>>>>>>>> >>>>> to >>>>>>>>>> >>>>> determine that termination state short of >>>>>>>>>> polling the new >>>>>>>>>> >>>>> index to >>>>>>>>>> >>>>> check the document count compared to the size >>>>>>>>>> of input >>>>>>>>>> >>>>> PCollection. >>>>>>>>>> >>>>> That, or maybe I'd need to use an external >>>>>>>>>> system like you >>>>>>>>>> >>>>> mentioned >>>>>>>>>> >>>>> to poll on the state of the pipeline (I'm using >>>>>>>>>> Google >>>>>>>>>> >>>>> Dataflow, so >>>>>>>>>> >>>>> maybe there's a way to do this with some API). >>>>>>>>>> >>>>> But I would have thought that there would be an >>>>>>>>>> easy way of >>>>>>>>>> >>>>> simply >>>>>>>>>> >>>>> saying "do not process this transform until >>>>>>>>>> this other >>>>>>>>>> >>>>> transform >>>>>>>>>> >>>>> completes". >>>>>>>>>> >>>>> Is there no established way of "signaling" >>>>>>>>>> between >>>>>>>>>> >>>>> pipelines when >>>>>>>>>> >>>>> some pipeline completes, or have some way of >>>>>>>>>> declaring a >>>>>>>>>> >>>>> dependency >>>>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Thanks again, >>>>>>>>>> >>>>> Philip >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste >>>>>>>>>> Onofré >>>>>>>>>> >>>>> <[email protected] <mailto:[email protected]> >>>>>>>>>> >>>>> <mailto:[email protected] >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <mailto:[email protected]>>> wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Hi Philip, >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> You won't be able to do (3) in the same >>>>>>>>>> pipeline as >>>>>>>>>> >>>>> the >>>>>>>>>> >>>>> Elasticsearch Sink >>>>>>>>>> >>>>> PTransform ends the pipeline with PDone. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> So, (3) has to be done in another pipeline >>>>>>>>>> (using a >>>>>>>>>> >>>>> DoFn) or in >>>>>>>>>> >>>>> another >>>>>>>>>> >>>>> "system" (like Camel for instance). I >>>>>>>>>> would do a check >>>>>>>>>> >>>>> of the >>>>>>>>>> >>>>> data in the >>>>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Regards >>>>>>>>>> >>>>> JB >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Hi, >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> I'm pretty new to Beam, and I've been >>>>>>>>>> trying to >>>>>>>>>> >>>>> use the >>>>>>>>>> >>>>> ElasticSearchIO >>>>>>>>>> >>>>> sink to write docs into ES. >>>>>>>>>> >>>>> With this, I want to be able to >>>>>>>>>> >>>>> 1. ingest and transform rows from DB >>>>>>>>>> (done) >>>>>>>>>> >>>>> 2. write JSON docs/strings into a new >>>>>>>>>> ES index >>>>>>>>>> >>>>> (done) >>>>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>>>> documents are >>>>>>>>>> >>>>> written into >>>>>>>>>> >>>>> a new index, >>>>>>>>>> >>>>> trigger an atomic index swap under an >>>>>>>>>> alias to >>>>>>>>>> >>>>> replace the >>>>>>>>>> >>>>> current >>>>>>>>>> >>>>> aliased index with the new index >>>>>>>>>> generated in step >>>>>>>>>> >>>>> 2. This >>>>>>>>>> >>>>> is basically >>>>>>>>>> >>>>> a single POST request to the ES >>>>>>>>>> cluster. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> The problem I'm facing is that I don't >>>>>>>>>> seem to be >>>>>>>>>> >>>>> able to >>>>>>>>>> >>>>> find a way to >>>>>>>>>> >>>>> have a way for (3) to happen after >>>>>>>>>> step (2) is >>>>>>>>>> >>>>> complete. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> The ElasticSearchIO.Write transform >>>>>>>>>> returns a >>>>>>>>>> >>>>> PDone, and >>>>>>>>>> >>>>> I'm not sure >>>>>>>>>> >>>>> how to proceed from there because it >>>>>>>>>> doesn't seem >>>>>>>>>> >>>>> to let me >>>>>>>>>> >>>>> do another >>>>>>>>>> >>>>> apply on it to "define" a dependency. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>> Write.html>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Is there a recommended way to >>>>>>>>>> construct pipelines >>>>>>>>>> >>>>> workflows >>>>>>>>>> >>>>> like this? >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Thanks in advance, >>>>>>>>>> >>>>> Philip >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>> >>>>> [email protected] <mailto:[email protected] >>>>>>>>>> > >>>>>>>>>> >>>>> <mailto:[email protected] <mailto: >>>>>>>>>> [email protected]>> >>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>> >>>>> [email protected] <mailto:[email protected]> >>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>>>> >>>>> ---------------------------------------------------- >>>>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> -- >>>>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>>>> >>>> [email protected] >>>>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>> >>>>
