This is only for "final pane" waiting, correct? So someone who writes a
sink in the global window probably would not want to use this.

On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <[email protected]>
wrote:

> PR is out https://github.com/apache/beam/pull/4301
>
> This should allow us to have useful sequencing for sinks like BigtableIO /
> BigQueryIO.
>
> Adding a couple of interested parties:
> - Steve, would you be interested in using this in
> https://github.com/apache/beam/pull/3997 ?
> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
> in particular, this works properly in case the input can be firing multiple
> times.
>
> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> I figured out the Never.ever() approach and it seems to work. Will finish
>> this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this
>> will be quite a useful transform.
>>
>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> I'm a bit confused by all of these suggestions: they sound plausible at
>>> a high level, but I'm having a hard time making any one of them concrete.
>>>
>>> So suppose we want to create a transform Wait.on(PCollection<?> signal):
>>> PCollection<T> -> PCollection<T>.
>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
>>> "a", but buffers panes of "a" in any given window until the final pane of
>>> "sig" in the same window is fired (or, if it's never fired, until the
>>> window closes? could use a deadletter for that maybe).
>>>
>>> This transform I suppose would need to have a keyed and unkeyed version.
>>>
>>> The keyed version would support merging window fns, and would require
>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk -
>>> followed by a stateful ParDo? Or is there a way to get away without a
>>> stateful ParDo here? (not all runners support it)
>>>
>>> The unkeyed version would not support merging window fns. Reuven, can
>>> you elaborate how your combiner idea would work here - in particular, what
>>> do you mean by "triggering only on the final pane"? Do you mean filter
>>> non-final panes before entering the combiner? I wonder if that'll work,
>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on
>>> the side input with a Never.ever() trigger"?
>>>
>>> Thanks.
>>>
>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> This is an interesting point.
>>>>
>>>> In the past, we've often just though about sequencing some action to
>>>> take place after the sink, in which case you can simply use the sink output
>>>> as a main input. However if you want to run a transform with another
>>>> PCollection as a main input, this doesn't work. And as you've discovered,
>>>> triggered side inputs are defined to be non-deterministic, and there's no
>>>> way to make things line up.
>>>>
>>>> What you're describing only makes sense if you're blocking against the
>>>> final pane (since otherwise there's no reasonable way to match up somePC
>>>> panes with the sink panes). There are multiple ways you can do this: one
>>>> would be to CGBK the two PCollections together, and trigger the new
>>>> transform only on the final pane. Another would be to add a combiner that
>>>> returns a Void, triggering only on the final pane, and then make this
>>>> singleton Void a side input. You could also do something explicit with the
>>>> state API.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <[email protected]
>>>> > wrote:
>>>>
>>>>> So this appears not as easy as anticipated (surprise!)
>>>>>
>>>>> Suppose we have a PCollection "donePanes" with an element per
>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>>>> data has been written; this pane is: final / non-final".
>>>>>
>>>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>>>>> happens only after the final pane has been written.
>>>>>
>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to
>>>>> happen when c emits a *final* pane.
>>>>>
>>>>> Unfortunately, using 
>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton()))
>>>>> doesn't do the trick: the side input becomes ready the moment *the
>>>>> first *pane of data has been written.
>>>>>
>>>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>>>>> only final panes...).apply(View.asSingleton())). It also becomes
>>>>> ready the moment *the first* pane has been written, you just get an
>>>>> exception if you access the side input before the *final* pane was
>>>>> written.
>>>>>
>>>>> I can't think of a pure-Beam solution to this: either "donePanes" will
>>>>> be used as a main input to something (and then everything else can only be
>>>>> a side input, which is not general enough), or it will be used as a side
>>>>> input (and then we can't achieve "trigger only after the final pane 
>>>>> fires").
>>>>>
>>>>> It seems that we need a way to control the side input pushback, and
>>>>> configure whether a view becomes ready when its first pane has fired or
>>>>> when its last pane has fired. I could see this be a property on the View
>>>>> transform itself. In terms of implementation - I tried to figure out how
>>>>> side input readiness is determined, in the direct runner and Dataflow
>>>>> runner, and I'm completely lost and would appreciate some help.
>>>>>
>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> This sounds great!
>>>>>>
>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> This would be absolutely great! It seems somewhat similar to the
>>>>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/
>>>>>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>>>>>>> io/gcp/bigquery/WriteResult.java).
>>>>>>>
>>>>>>> I find it helpful to think about the different things that may come
>>>>>>> after a sink. For instance:
>>>>>>>
>>>>>>> 1. It might be helpful to have a collection of failing input
>>>>>>> elements. The type of failed elements is pretty straightforward -- just 
>>>>>>> the
>>>>>>> input elements. This allows handling such failures by directing them
>>>>>>> elsewhere or performing additional processing.
>>>>>>>
>>>>>>
>>>>>> BigQueryIO already does this as you point out.
>>>>>>
>>>>>>>
>>>>>>> 2. For a sink that produces a series of files, it might be useful to
>>>>>>> have a collection of the file names that have been completely written. 
>>>>>>> This
>>>>>>> allows performing additional handling on these completed segments.
>>>>>>>
>>>>>>
>>>>>> In fact we already do this for FileBasedSinks.   See
>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99
>>>>>> 956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>>>>> io/WriteFilesResult.java
>>>>>>
>>>>>>>
>>>>>>> 3. For a sink that updates some destination, it would be reasonable
>>>>>>> to have a collection that provides (periodically) output indicating how
>>>>>>> complete the information written to that destination is. For instance, 
>>>>>>> this
>>>>>>> might be something like "<this bigquery table> has all of the elements 
>>>>>>> up
>>>>>>> to <input watermark>" complete. This allows tracking how much 
>>>>>>> information
>>>>>>> has been completely written out.
>>>>>>>
>>>>>>
>>>>>> Interesting. Maybe tough to do since sinks often don't have that
>>>>>> knowledge.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> I think those concepts map to the more detailed description Eugene
>>>>>>> provided, but I find it helpful to focus on what information comes out 
>>>>>>> of
>>>>>>> the sink and how it might be used.
>>>>>>>
>>>>>>> Were there any use cases the above miss? Any functionality that has
>>>>>>> been described that doesn't map to these use cases?
>>>>>>>
>>>>>>> -- Ben
>>>>>>>
>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> It makes sense to consider how this maps onto existing kinds of
>>>>>>>> sinks.
>>>>>>>>
>>>>>>>> E.g.:
>>>>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write():
>>>>>>>> that will emit 1 result per bundle (either a bogus value or number of
>>>>>>>> records written) that will be Combine'd into 1 result per pane of 
>>>>>>>> input. A
>>>>>>>> user can sequence against this and be notified when some intermediate
>>>>>>>> amount of data has been written for a window, or (via .isFinal()) when 
>>>>>>>> all
>>>>>>>> of it has been written.
>>>>>>>> - Something that e.g. initiates an import job, such as
>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic
>>>>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>>>>> information about the job (e.g. its id and statistics). Role of panes 
>>>>>>>> is
>>>>>>>> the same.
>>>>>>>> - Something like above but that supports dynamic destinations: like
>>>>>>>> in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> 
>>>>>>>> where
>>>>>>>> ResultT may be something like a list of files that were written for 
>>>>>>>> this
>>>>>>>> pane of this destination.
>>>>>>>>
>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I agree that the proper API for enabling the use case "do
>>>>>>>>> something after the data has been written" is to return a PCollection 
>>>>>>>>> of
>>>>>>>>> objects where each object represents the result of writing some
>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this
>>>>>>>>> PCollection, in order to "do something after this subset has been 
>>>>>>>>> written".
>>>>>>>>>
>>>>>>>>> The challenging part here is *identifying* the subset of the data
>>>>>>>>> that's been written, in a way consistent with Beam's unified
>>>>>>>>> batch/streaming model, where saying "all data has been written" is 
>>>>>>>>> not an
>>>>>>>>> option because more data can arrive.
>>>>>>>>>
>>>>>>>>> The next choice is "a window of input has been written", but then
>>>>>>>>> again, late data can arrive into a window as well.
>>>>>>>>>
>>>>>>>>> Next choice after that is "a pane of input has been written", but
>>>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of
>>>>>>>>> input" is moot: triggering and panes should be something private to 
>>>>>>>>> the
>>>>>>>>> sink, and the same input can trigger different sinks differently. The
>>>>>>>>> hypothetical different accumulation modes make this trickier still. 
>>>>>>>>> I'm not
>>>>>>>>> sure whether we intend to also challenge the idea that windowing is
>>>>>>>>> inherent to the collection, or whether it too should be specified on a
>>>>>>>>> transform that processes the collection. I think for the sake of this
>>>>>>>>> discussion we can assume that it's inherent, and assume the mental 
>>>>>>>>> model
>>>>>>>>> that the elements in different windows of a PCollection are processed
>>>>>>>>> independently - "as if" there were multiple pipelines processing each
>>>>>>>>> window.
>>>>>>>>>
>>>>>>>>> Overall, embracing the full picture, we end up with something like
>>>>>>>>> this:
>>>>>>>>> - The input PCollection is a composition of windows.
>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>>>>>>>>> windows), the below applies to the entire contents of the 
>>>>>>>>> PCollection. If
>>>>>>>>> it's merging (e.g. session windows), then it applies per-key, and the 
>>>>>>>>> input
>>>>>>>>> should be (perhaps implicitly) keyed in a way that the sink 
>>>>>>>>> understands -
>>>>>>>>> for example, the grouping by destination in DynamicDestinations in 
>>>>>>>>> file and
>>>>>>>>> bigquery writes.
>>>>>>>>> - Each window's contents is a "changelog" - stream of elements and
>>>>>>>>> retractions.
>>>>>>>>> - A "sink" processes each window of the collection, deciding how
>>>>>>>>> to handle elements and retractions (and whether to support 
>>>>>>>>> retractions at
>>>>>>>>> all) in a sink-specific way, and deciding *when* to perform the side
>>>>>>>>> effects for a portion of the changelog (a "pane") based on the sink's
>>>>>>>>> triggering strategy.
>>>>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list
>>>>>>>>> of filenames that have been written, or simply a number of records 
>>>>>>>>> that was
>>>>>>>>> written, or a bogus void value etc. The result will implicitly 
>>>>>>>>> include the
>>>>>>>>> window of the input it's associated with. It will also implicitly 
>>>>>>>>> include
>>>>>>>>> pane information - index of the pane in this window, and whether this 
>>>>>>>>> is
>>>>>>>>> the first or last pane.
>>>>>>>>> - The partitioning into bundles is an implementation detail and
>>>>>>>>> not very useful, so before presenting the pane write results to the 
>>>>>>>>> user,
>>>>>>>>> the sink will probably want to Combine the bundle results so that 
>>>>>>>>> there
>>>>>>>>> ends up being 1 value for each pane that was written. Once again note 
>>>>>>>>> that
>>>>>>>>> panes may be associated with windows of the input as a whole, but if 
>>>>>>>>> the
>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be associated 
>>>>>>>>> with
>>>>>>>>> per-key subsets of windows of the input.
>>>>>>>>> - This combining requires an extra, well, combining operation, so
>>>>>>>>> it should be optional.
>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or a
>>>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, 
>>>>>>>>> where
>>>>>>>>> the elements of this collection will implicitly have window and pane
>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace
>>>>>>>>> the fact that trigger strategy is set on the input. But in that case 
>>>>>>>>> the
>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not 
>>>>>>>>> necessarily
>>>>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>>>>> internal
>>>>>>>>> aggregation as an implementation detail, which may modify the 
>>>>>>>>> triggering
>>>>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>>>>> - In most cases, one may imagine that the user is interested in
>>>>>>>>> being notified of "no more data associated with this window will be
>>>>>>>>> written", so the user will ignore all ResultT's except those where 
>>>>>>>>> the pane
>>>>>>>>> is marked final. If a user is interested in being notified of 
>>>>>>>>> intermediate
>>>>>>>>> write results - they'll have to embrace the fact that they cannot 
>>>>>>>>> identify
>>>>>>>>> the precise subset of input associated with the intermediate result.
>>>>>>>>>
>>>>>>>>> I think the really key points of the above are:
>>>>>>>>> - Sinks should support windowed input. Sinks should write
>>>>>>>>> different windows of input independently. If the sink can write
>>>>>>>>> multi-destination input, the destination should function as a 
>>>>>>>>> grouping key,
>>>>>>>>> and in that case merging windowing should be allowed.
>>>>>>>>> - Producing a PCollection of write results should be optional.
>>>>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in 
>>>>>>>>> the
>>>>>>>>> window of the input that was written, and have a PaneInfo assigned by 
>>>>>>>>> the
>>>>>>>>> sink, of which probably the only part useful to the user is whether 
>>>>>>>>> it's
>>>>>>>>> .isFinal().
>>>>>>>>>
>>>>>>>>> Does this sound reasonable?
>>>>>>>>>
>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> +1
>>>>>>>>>>
>>>>>>>>>> At the very least an empty PCollection<?> could be produced with
>>>>>>>>>> no
>>>>>>>>>> promises about its contents but the ability to be followed (e.g.
>>>>>>>>>> as a
>>>>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>>>>> metadata
>>>>>>>>>> one may decide to produce in the future.
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>> > +dev@
>>>>>>>>>> >
>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are
>>>>>>>>>> easy to
>>>>>>>>>> > understand and a good way for an IO to communicate and
>>>>>>>>>> establish causal
>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may
>>>>>>>>>> spur further
>>>>>>>>>> > useful thoughts based on the design decisions about what sort
>>>>>>>>>> of output is
>>>>>>>>>> > most useful.
>>>>>>>>>> >
>>>>>>>>>> > Kenn
>>>>>>>>>> >
>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> I think all sinks actually do have valuable information to
>>>>>>>>>> output which
>>>>>>>>>> >> can be used after a write (file names, transaction/commit/row
>>>>>>>>>> ids, table
>>>>>>>>>> >> names, ...). In addition to this metadata, having a
>>>>>>>>>> PCollection of all
>>>>>>>>>> >> successful writes and all failed writes is useful for users so
>>>>>>>>>> they can
>>>>>>>>>> >> chain an action which depends on what was or wasn't
>>>>>>>>>> successfully written.
>>>>>>>>>> >> Users have requested adding retry/failure handling policies to
>>>>>>>>>> sinks so that
>>>>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>>>>> >>
>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>>>>> [email protected]>
>>>>>>>>>> >> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> So I agree generally with the idea that returning a
>>>>>>>>>> PCollection makes all
>>>>>>>>>> >>> of this easier so that arbitrary additional functions can be
>>>>>>>>>> added, what
>>>>>>>>>> >>> exactly would write functions be returning in a PCollection
>>>>>>>>>> that would make
>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>>>>> source and now
>>>>>>>>>> >>> the collection itself is no longer needed.
>>>>>>>>>> >>>
>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently
>>>>>>>>>> that doesn’t
>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible
>>>>>>>>>> ways of handling
>>>>>>>>>> >>> this given this conversation, and am curious which solution
>>>>>>>>>> sounds like the
>>>>>>>>>> >>> best way to deal with the problem:
>>>>>>>>>> >>>
>>>>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>>>>> (which would
>>>>>>>>>> >>> be the same across transforms by convention), that is in the
>>>>>>>>>> form of a
>>>>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>>>>> >>>
>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>>>>> PCollection so we
>>>>>>>>>> >>> can run applies afterward.
>>>>>>>>>> >>>
>>>>>>>>>> >>> 3. Make output transforms provide the facility for a callback
>>>>>>>>>> function
>>>>>>>>>> >>> which runs after the transform is complete.
>>>>>>>>>> >>>
>>>>>>>>>> >>> I went through these gymnastics recently when I was trying to
>>>>>>>>>> build
>>>>>>>>>> >>> something that would move indices after writing to Algolia,
>>>>>>>>>> and the solution
>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to exist
>>>>>>>>>> in Beam. The
>>>>>>>>>> >>> problem is that particular method requires the output
>>>>>>>>>> transform in question
>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t
>>>>>>>>>> make sense to
>>>>>>>>>> >>> return one. This seems like a bad solution, but unfortunately
>>>>>>>>>> there isn’t a
>>>>>>>>>> >>> notion of a transform that has no explicit output that needs
>>>>>>>>>> to have
>>>>>>>>>> >>> operations occur after it.
>>>>>>>>>> >>>
>>>>>>>>>> >>> The three potential solutions above address this issue, but I
>>>>>>>>>> would like
>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a different
>>>>>>>>>> proposal
>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on
>>>>>>>>>> this, since it
>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it
>>>>>>>>>> really useful, for
>>>>>>>>>> >>> one.
>>>>>>>>>> >>>
>>>>>>>>>> >>> Chet
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is
>>>>>>>>>> returned
>>>>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>>>>> additional functions
>>>>>>>>>> >>> can be applied.
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>>>>> [email protected]>
>>>>>>>>>> >>> wrote:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than
>>>>>>>>>> in the
>>>>>>>>>> >>>> main.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Regards
>>>>>>>>>> >>>> JB
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> I do something almost exactly like this, but with
>>>>>>>>>> BigtableIO instead.
>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I
>>>>>>>>>> need to finish this
>>>>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>>>>> something like
>>>>>>>>>> >>>>> this.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the
>>>>>>>>>> output from
>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function which
>>>>>>>>>> will run when
>>>>>>>>>> >>>>> all writes finish.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> You probably want to avoid doing something in the main
>>>>>>>>>> method because
>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver
>>>>>>>>>> will die, get
>>>>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <
>>>>>>>>>> [email protected]
>>>>>>>>>> >>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in
>>>>>>>>>> your Main
>>>>>>>>>> >>>>> method.
>>>>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>     Example:
>>>>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>>>>> >>>>>     try {
>>>>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>>>>> >>>>>     //DO ES work
>>>>>>>>>> >>>>>     }
>>>>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>>>>> >>>>>     result.cancel();
>>>>>>>>>> >>>>>     throw e;
>>>>>>>>>> >>>>>     }
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a work
>>>>>>>>>> flow.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>>>>> >>>>> <[email protected]
>>>>>>>>>> >>>>>     <mailto:[email protected]>> wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         Hi,
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         We can imagine to have a user callback fn fired
>>>>>>>>>> when the sink
>>>>>>>>>> >>>>> batch is
>>>>>>>>>> >>>>>         complete.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         Let me think about that.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         Regards
>>>>>>>>>> >>>>>         JB
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>             Hey JB,
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>>>>> monitoring
>>>>>>>>>> >>>>> when the ES
>>>>>>>>>> >>>>>             transform completes successfully before I can
>>>>>>>>>> proceed with
>>>>>>>>>> >>>>> doing the
>>>>>>>>>> >>>>>             swap.
>>>>>>>>>> >>>>>             The problem with this is that I can't think of
>>>>>>>>>> a good way
>>>>>>>>>> >>>>> to
>>>>>>>>>> >>>>>             determine that termination state short of
>>>>>>>>>> polling the new
>>>>>>>>>> >>>>> index to
>>>>>>>>>> >>>>>             check the document count compared to the size
>>>>>>>>>> of input
>>>>>>>>>> >>>>> PCollection.
>>>>>>>>>> >>>>>             That, or maybe I'd need to use an external
>>>>>>>>>> system like you
>>>>>>>>>> >>>>> mentioned
>>>>>>>>>> >>>>>             to poll on the state of the pipeline (I'm using
>>>>>>>>>> Google
>>>>>>>>>> >>>>> Dataflow, so
>>>>>>>>>> >>>>>             maybe there's a way to do this with some API).
>>>>>>>>>> >>>>>             But I would have thought that there would be an
>>>>>>>>>> easy way of
>>>>>>>>>> >>>>> simply
>>>>>>>>>> >>>>>             saying "do not process this transform until
>>>>>>>>>> this other
>>>>>>>>>> >>>>> transform
>>>>>>>>>> >>>>>             completes".
>>>>>>>>>> >>>>>             Is there no established way of "signaling"
>>>>>>>>>> between
>>>>>>>>>> >>>>> pipelines when
>>>>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>>>>> declaring a
>>>>>>>>>> >>>>> dependency
>>>>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>             Thanks again,
>>>>>>>>>> >>>>>             Philip
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste
>>>>>>>>>> Onofré
>>>>>>>>>> >>>>>             <[email protected] <mailto:[email protected]>
>>>>>>>>>> >>>>> <mailto:[email protected]
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>             <mailto:[email protected]>>> wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                  Hi Philip,
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>>>>> pipeline as
>>>>>>>>>> >>>>> the
>>>>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                  So, (3) has to be done in another pipeline
>>>>>>>>>> (using a
>>>>>>>>>> >>>>> DoFn) or in
>>>>>>>>>> >>>>>             another
>>>>>>>>>> >>>>>                  "system" (like Camel for instance). I
>>>>>>>>>> would do a check
>>>>>>>>>> >>>>> of the
>>>>>>>>>> >>>>>             data in the
>>>>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                  Regards
>>>>>>>>>> >>>>>                  JB
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                      Hi,
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                      I'm pretty new to Beam, and I've been
>>>>>>>>>> trying to
>>>>>>>>>> >>>>> use the
>>>>>>>>>> >>>>>             ElasticSearchIO
>>>>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>>>>> >>>>>                      1. ingest and transform rows from DB
>>>>>>>>>> (done)
>>>>>>>>>> >>>>>                      2. write JSON docs/strings into a new
>>>>>>>>>> ES index
>>>>>>>>>> >>>>> (done)
>>>>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>>>>> documents are
>>>>>>>>>> >>>>> written into
>>>>>>>>>> >>>>>             a new index,
>>>>>>>>>> >>>>>                      trigger an atomic index swap under an
>>>>>>>>>> alias to
>>>>>>>>>> >>>>> replace the
>>>>>>>>>> >>>>>             current
>>>>>>>>>> >>>>>                      aliased index with the new index
>>>>>>>>>> generated in step
>>>>>>>>>> >>>>> 2. This
>>>>>>>>>> >>>>>             is basically
>>>>>>>>>> >>>>>                      a single POST request to the ES
>>>>>>>>>> cluster.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                      The problem I'm facing is that I don't
>>>>>>>>>> seem to be
>>>>>>>>>> >>>>> able to
>>>>>>>>>> >>>>>             find a way to
>>>>>>>>>> >>>>>                      have a way for (3) to happen after
>>>>>>>>>> step (2) is
>>>>>>>>>> >>>>> complete.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>>>>> returns a
>>>>>>>>>> >>>>> PDone, and
>>>>>>>>>> >>>>>             I'm not sure
>>>>>>>>>> >>>>>                      how to proceed from there because it
>>>>>>>>>> doesn't seem
>>>>>>>>>> >>>>> to let me
>>>>>>>>>> >>>>>             do another
>>>>>>>>>> >>>>>                      apply on it to "define" a dependency.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>> Write.html>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                      Is there a recommended way to
>>>>>>>>>> construct pipelines
>>>>>>>>>> >>>>> workflows
>>>>>>>>>> >>>>>             like this?
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                      Thanks in advance,
>>>>>>>>>> >>>>>                      Philip
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>>>>> >>>>>             [email protected] <mailto:[email protected]
>>>>>>>>>> >
>>>>>>>>>> >>>>>             <mailto:[email protected] <mailto:
>>>>>>>>>> [email protected]>>
>>>>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>>>>> >>>>>         [email protected] <mailto:[email protected]>
>>>>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> --
>>>>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>>>>> >>>> [email protected]
>>>>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>

Reply via email to