I think it is also well-defined for the on-time (watermark) pane, as there
is always only one such. In general I think sequencing is well defined for
three panes:

1. The first pane fired (this is how side inputs are sequenced)
2. The first on-time pane, as there is only one.
3. The final pane.



On Thu, Dec 21, 2017 at 5:49 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> Yeah. And I don't think there's a good way to define what sequencing even
> means, if the sink is returning results in windows that aren't gonna have a
> final pane.
>
> On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote:
>
>> This is only for "final pane" waiting, correct? So someone who writes a
>> sink in the global window probably would not want to use this.
>>
>> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> PR is out https://github.com/apache/beam/pull/4301
>>>
>>> This should allow us to have useful sequencing for sinks like BigtableIO
>>> / BigQueryIO.
>>>
>>> Adding a couple of interested parties:
>>> - Steve, would you be interested in using this in
>>> https://github.com/apache/beam/pull/3997 ?
>>> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
>>> in particular, this works properly in case the input can be firing multiple
>>> times.
>>>
>>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> I figured out the Never.ever() approach and it seems to work. Will
>>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems like
>>>> this will be quite a useful transform.
>>>>
>>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> I'm a bit confused by all of these suggestions: they sound plausible
>>>>> at a high level, but I'm having a hard time making any one of them 
>>>>> concrete.
>>>>>
>>>>> So suppose we want to create a transform Wait.on(PCollection<?>
>>>>> signal): PCollection<T> -> PCollection<T>.
>>>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical
>>>>> to "a", but buffers panes of "a" in any given window until the final pane
>>>>> of "sig" in the same window is fired (or, if it's never fired, until the
>>>>> window closes? could use a deadletter for that maybe).
>>>>>
>>>>> This transform I suppose would need to have a keyed and unkeyed
>>>>> version.
>>>>>
>>>>> The keyed version would support merging window fns, and would require
>>>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk -
>>>>> followed by a stateful ParDo? Or is there a way to get away without a
>>>>> stateful ParDo here? (not all runners support it)
>>>>>
>>>>> The unkeyed version would not support merging window fns. Reuven, can
>>>>> you elaborate how your combiner idea would work here - in particular, what
>>>>> do you mean by "triggering only on the final pane"? Do you mean filter
>>>>> non-final panes before entering the combiner? I wonder if that'll work,
>>>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on
>>>>> the side input with a Never.ever() trigger"?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> This is an interesting point.
>>>>>>
>>>>>> In the past, we've often just though about sequencing some action to
>>>>>> take place after the sink, in which case you can simply use the sink 
>>>>>> output
>>>>>> as a main input. However if you want to run a transform with another
>>>>>> PCollection as a main input, this doesn't work. And as you've discovered,
>>>>>> triggered side inputs are defined to be non-deterministic, and there's no
>>>>>> way to make things line up.
>>>>>>
>>>>>> What you're describing only makes sense if you're blocking against
>>>>>> the final pane (since otherwise there's no reasonable way to match up
>>>>>> somePC panes with the sink panes). There are multiple ways you can do 
>>>>>> this:
>>>>>> one would be to CGBK the two PCollections together, and trigger the new
>>>>>> transform only on the final pane. Another would be to add a combiner that
>>>>>> returns a Void, triggering only on the final pane, and then make this
>>>>>> singleton Void a side input. You could also do something explicit with 
>>>>>> the
>>>>>> state API.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>> So this appears not as easy as anticipated (surprise!)
>>>>>>>
>>>>>>> Suppose we have a PCollection "donePanes" with an element per
>>>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>>>>>> data has been written; this pane is: final / non-final".
>>>>>>>
>>>>>>> Suppose we want to use this to ensure that
>>>>>>> somePc.apply(ParDo.of(fn)) happens only after the final pane has been
>>>>>>> written.
>>>>>>>
>>>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to
>>>>>>> happen when c emits a *final* pane.
>>>>>>>
>>>>>>> Unfortunately, using ParDo.of(fn).withSideInputs(
>>>>>>> donePanes.apply(View.asSingleton())) doesn't do the trick: the side
>>>>>>> input becomes ready the moment *the first *pane of data has been
>>>>>>> written.
>>>>>>>
>>>>>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>>>>>>> only final panes...).apply(View.asSingleton())). It also becomes
>>>>>>> ready the moment *the first* pane has been written, you just get an
>>>>>>> exception if you access the side input before the *final* pane was
>>>>>>> written.
>>>>>>>
>>>>>>> I can't think of a pure-Beam solution to this: either "donePanes"
>>>>>>> will be used as a main input to something (and then everything else can
>>>>>>> only be a side input, which is not general enough), or it will be used 
>>>>>>> as a
>>>>>>> side input (and then we can't achieve "trigger only after the final pane
>>>>>>> fires").
>>>>>>>
>>>>>>> It seems that we need a way to control the side input pushback, and
>>>>>>> configure whether a view becomes ready when its first pane has fired or
>>>>>>> when its last pane has fired. I could see this be a property on the View
>>>>>>> transform itself. In terms of implementation - I tried to figure out how
>>>>>>> side input readiness is determined, in the direct runner and Dataflow
>>>>>>> runner, and I'm completely lost and would appreciate some help.
>>>>>>>
>>>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> This sounds great!
>>>>>>>>
>>>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This would be absolutely great! It seems somewhat similar to the
>>>>>>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/
>>>>>>>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>>>>>>>>> io/gcp/bigquery/WriteResult.java).
>>>>>>>>>
>>>>>>>>> I find it helpful to think about the different things that may
>>>>>>>>> come after a sink. For instance:
>>>>>>>>>
>>>>>>>>> 1. It might be helpful to have a collection of failing input
>>>>>>>>> elements. The type of failed elements is pretty straightforward -- 
>>>>>>>>> just the
>>>>>>>>> input elements. This allows handling such failures by directing them
>>>>>>>>> elsewhere or performing additional processing.
>>>>>>>>>
>>>>>>>>
>>>>>>>> BigQueryIO already does this as you point out.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2. For a sink that produces a series of files, it might be useful
>>>>>>>>> to have a collection of the file names that have been completely 
>>>>>>>>> written.
>>>>>>>>> This allows performing additional handling on these completed 
>>>>>>>>> segments.
>>>>>>>>>
>>>>>>>>
>>>>>>>> In fact we already do this for FileBasedSinks.   See
>>>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99
>>>>>>>> 956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>>>>>>> io/WriteFilesResult.java
>>>>>>>>
>>>>>>>>>
>>>>>>>>> 3. For a sink that updates some destination, it would be
>>>>>>>>> reasonable to have a collection that provides (periodically) output
>>>>>>>>> indicating how complete the information written to that destination 
>>>>>>>>> is. For
>>>>>>>>> instance, this might be something like "<this bigquery table> has all 
>>>>>>>>> of
>>>>>>>>> the elements up to <input watermark>" complete. This allows tracking 
>>>>>>>>> how
>>>>>>>>> much information has been completely written out.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Interesting. Maybe tough to do since sinks often don't have that
>>>>>>>> knowledge.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think those concepts map to the more detailed description Eugene
>>>>>>>>> provided, but I find it helpful to focus on what information comes 
>>>>>>>>> out of
>>>>>>>>> the sink and how it might be used.
>>>>>>>>>
>>>>>>>>> Were there any use cases the above miss? Any functionality that
>>>>>>>>> has been described that doesn't map to these use cases?
>>>>>>>>>
>>>>>>>>> -- Ben
>>>>>>>>>
>>>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> It makes sense to consider how this maps onto existing kinds of
>>>>>>>>>> sinks.
>>>>>>>>>>
>>>>>>>>>> E.g.:
>>>>>>>>>> - Something that just makes an RPC per record, e.g.
>>>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus 
>>>>>>>>>> value or
>>>>>>>>>> number of records written) that will be Combine'd into 1 result per 
>>>>>>>>>> pane of
>>>>>>>>>> input. A user can sequence against this and be notified when some
>>>>>>>>>> intermediate amount of data has been written for a window, or (via
>>>>>>>>>> .isFinal()) when all of it has been written.
>>>>>>>>>> - Something that e.g. initiates an import job, such as
>>>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up 
>>>>>>>>>> atomic
>>>>>>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>>>>>>> information about the job (e.g. its id and statistics). Role of 
>>>>>>>>>> panes is
>>>>>>>>>> the same.
>>>>>>>>>> - Something like above but that supports dynamic destinations:
>>>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, 
>>>>>>>>>> ResultT>>
>>>>>>>>>> where ResultT may be something like a list of files that were 
>>>>>>>>>> written for
>>>>>>>>>> this pane of this destination.
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <
>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I agree that the proper API for enabling the use case "do
>>>>>>>>>>> something after the data has been written" is to return a 
>>>>>>>>>>> PCollection of
>>>>>>>>>>> objects where each object represents the result of writing some
>>>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this
>>>>>>>>>>> PCollection, in order to "do something after this subset has been 
>>>>>>>>>>> written".
>>>>>>>>>>>
>>>>>>>>>>> The challenging part here is *identifying* the subset of the
>>>>>>>>>>> data that's been written, in a way consistent with Beam's unified
>>>>>>>>>>> batch/streaming model, where saying "all data has been written" is 
>>>>>>>>>>> not an
>>>>>>>>>>> option because more data can arrive.
>>>>>>>>>>>
>>>>>>>>>>> The next choice is "a window of input has been written", but
>>>>>>>>>>> then again, late data can arrive into a window as well.
>>>>>>>>>>>
>>>>>>>>>>> Next choice after that is "a pane of input has been written",
>>>>>>>>>>> but per https://s.apache.org/beam-sink-triggers the term "pane
>>>>>>>>>>> of input" is moot: triggering and panes should be something private 
>>>>>>>>>>> to the
>>>>>>>>>>> sink, and the same input can trigger different sinks differently. 
>>>>>>>>>>> The
>>>>>>>>>>> hypothetical different accumulation modes make this trickier still. 
>>>>>>>>>>> I'm not
>>>>>>>>>>> sure whether we intend to also challenge the idea that windowing is
>>>>>>>>>>> inherent to the collection, or whether it too should be specified 
>>>>>>>>>>> on a
>>>>>>>>>>> transform that processes the collection. I think for the sake of 
>>>>>>>>>>> this
>>>>>>>>>>> discussion we can assume that it's inherent, and assume the mental 
>>>>>>>>>>> model
>>>>>>>>>>> that the elements in different windows of a PCollection are 
>>>>>>>>>>> processed
>>>>>>>>>>> independently - "as if" there were multiple pipelines processing 
>>>>>>>>>>> each
>>>>>>>>>>> window.
>>>>>>>>>>>
>>>>>>>>>>> Overall, embracing the full picture, we end up with something
>>>>>>>>>>> like this:
>>>>>>>>>>> - The input PCollection is a composition of windows.
>>>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or
>>>>>>>>>>> sliding windows), the below applies to the entire contents of the
>>>>>>>>>>> PCollection. If it's merging (e.g. session windows), then it applies
>>>>>>>>>>> per-key, and the input should be (perhaps implicitly) keyed in a 
>>>>>>>>>>> way that
>>>>>>>>>>> the sink understands - for example, the grouping by destination in
>>>>>>>>>>> DynamicDestinations in file and bigquery writes.
>>>>>>>>>>> - Each window's contents is a "changelog" - stream of elements
>>>>>>>>>>> and retractions.
>>>>>>>>>>> - A "sink" processes each window of the collection, deciding how
>>>>>>>>>>> to handle elements and retractions (and whether to support 
>>>>>>>>>>> retractions at
>>>>>>>>>>> all) in a sink-specific way, and deciding *when* to perform the side
>>>>>>>>>>> effects for a portion of the changelog (a "pane") based on the 
>>>>>>>>>>> sink's
>>>>>>>>>>> triggering strategy.
>>>>>>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list
>>>>>>>>>>> of filenames that have been written, or simply a number of records 
>>>>>>>>>>> that was
>>>>>>>>>>> written, or a bogus void value etc. The result will implicitly 
>>>>>>>>>>> include the
>>>>>>>>>>> window of the input it's associated with. It will also implicitly 
>>>>>>>>>>> include
>>>>>>>>>>> pane information - index of the pane in this window, and whether 
>>>>>>>>>>> this is
>>>>>>>>>>> the first or last pane.
>>>>>>>>>>> - The partitioning into bundles is an implementation detail and
>>>>>>>>>>> not very useful, so before presenting the pane write results to the 
>>>>>>>>>>> user,
>>>>>>>>>>> the sink will probably want to Combine the bundle results so that 
>>>>>>>>>>> there
>>>>>>>>>>> ends up being 1 value for each pane that was written. Once again 
>>>>>>>>>>> note that
>>>>>>>>>>> panes may be associated with windows of the input as a whole, but 
>>>>>>>>>>> if the
>>>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be 
>>>>>>>>>>> associated with
>>>>>>>>>>> per-key subsets of windows of the input.
>>>>>>>>>>> - This combining requires an extra, well, combining operation,
>>>>>>>>>>> so it should be optional.
>>>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or
>>>>>>>>>>> a PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and 
>>>>>>>>>>> ResultT, where
>>>>>>>>>>> the elements of this collection will implicitly have window and pane
>>>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace
>>>>>>>>>>> the fact that trigger strategy is set on the input. But in that 
>>>>>>>>>>> case the
>>>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not 
>>>>>>>>>>> necessarily
>>>>>>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>>>>>>> internal
>>>>>>>>>>> aggregation as an implementation detail, which may modify the 
>>>>>>>>>>> triggering
>>>>>>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>>>>>>> - In most cases, one may imagine that the user is interested in
>>>>>>>>>>> being notified of "no more data associated with this window will be
>>>>>>>>>>> written", so the user will ignore all ResultT's except those where 
>>>>>>>>>>> the pane
>>>>>>>>>>> is marked final. If a user is interested in being notified of 
>>>>>>>>>>> intermediate
>>>>>>>>>>> write results - they'll have to embrace the fact that they cannot 
>>>>>>>>>>> identify
>>>>>>>>>>> the precise subset of input associated with the intermediate result.
>>>>>>>>>>>
>>>>>>>>>>> I think the really key points of the above are:
>>>>>>>>>>> - Sinks should support windowed input. Sinks should write
>>>>>>>>>>> different windows of input independently. If the sink can write
>>>>>>>>>>> multi-destination input, the destination should function as a 
>>>>>>>>>>> grouping key,
>>>>>>>>>>> and in that case merging windowing should be allowed.
>>>>>>>>>>> - Producing a PCollection of write results should be optional.
>>>>>>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in 
>>>>>>>>>>> the
>>>>>>>>>>> window of the input that was written, and have a PaneInfo assigned 
>>>>>>>>>>> by the
>>>>>>>>>>> sink, of which probably the only part useful to the user is whether 
>>>>>>>>>>> it's
>>>>>>>>>>> .isFinal().
>>>>>>>>>>>
>>>>>>>>>>> Does this sound reasonable?
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <
>>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> +1
>>>>>>>>>>>>
>>>>>>>>>>>> At the very least an empty PCollection<?> could be produced
>>>>>>>>>>>> with no
>>>>>>>>>>>> promises about its contents but the ability to be followed
>>>>>>>>>>>> (e.g. as a
>>>>>>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>>>>>>> metadata
>>>>>>>>>>>> one may decide to produce in the future.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <
>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>> > +dev@
>>>>>>>>>>>> >
>>>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are
>>>>>>>>>>>> easy to
>>>>>>>>>>>> > understand and a good way for an IO to communicate and
>>>>>>>>>>>> establish causal
>>>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may
>>>>>>>>>>>> spur further
>>>>>>>>>>>> > useful thoughts based on the design decisions about what sort
>>>>>>>>>>>> of output is
>>>>>>>>>>>> > most useful.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Kenn
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <
>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> I think all sinks actually do have valuable information to
>>>>>>>>>>>> output which
>>>>>>>>>>>> >> can be used after a write (file names,
>>>>>>>>>>>> transaction/commit/row ids, table
>>>>>>>>>>>> >> names, ...). In addition to this metadata, having a
>>>>>>>>>>>> PCollection of all
>>>>>>>>>>>> >> successful writes and all failed writes is useful for users
>>>>>>>>>>>> so they can
>>>>>>>>>>>> >> chain an action which depends on what was or wasn't
>>>>>>>>>>>> successfully written.
>>>>>>>>>>>> >> Users have requested adding retry/failure handling policies
>>>>>>>>>>>> to sinks so that
>>>>>>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>>>>>>> chet.aldr...@postmates.com>
>>>>>>>>>>>> >> wrote:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> So I agree generally with the idea that returning a
>>>>>>>>>>>> PCollection makes all
>>>>>>>>>>>> >>> of this easier so that arbitrary additional functions can
>>>>>>>>>>>> be added, what
>>>>>>>>>>>> >>> exactly would write functions be returning in a PCollection
>>>>>>>>>>>> that would make
>>>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>>>>>>> source and now
>>>>>>>>>>>> >>> the collection itself is no longer needed.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently
>>>>>>>>>>>> that doesn’t
>>>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible
>>>>>>>>>>>> ways of handling
>>>>>>>>>>>> >>> this given this conversation, and am curious which solution
>>>>>>>>>>>> sounds like the
>>>>>>>>>>>> >>> best way to deal with the problem:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>>>>>>> (which would
>>>>>>>>>>>> >>> be the same across transforms by convention), that is in
>>>>>>>>>>>> the form of a
>>>>>>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>>>>>>> PCollection so we
>>>>>>>>>>>> >>> can run applies afterward.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> 3. Make output transforms provide the facility for a
>>>>>>>>>>>> callback function
>>>>>>>>>>>> >>> which runs after the transform is complete.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> I went through these gymnastics recently when I was trying
>>>>>>>>>>>> to build
>>>>>>>>>>>> >>> something that would move indices after writing to Algolia,
>>>>>>>>>>>> and the solution
>>>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to
>>>>>>>>>>>> exist in Beam. The
>>>>>>>>>>>> >>> problem is that particular method requires the output
>>>>>>>>>>>> transform in question
>>>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t
>>>>>>>>>>>> make sense to
>>>>>>>>>>>> >>> return one. This seems like a bad solution, but
>>>>>>>>>>>> unfortunately there isn’t a
>>>>>>>>>>>> >>> notion of a transform that has no explicit output that
>>>>>>>>>>>> needs to have
>>>>>>>>>>>> >>> operations occur after it.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> The three potential solutions above address this issue, but
>>>>>>>>>>>> I would like
>>>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a
>>>>>>>>>>>> different proposal
>>>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on
>>>>>>>>>>>> this, since it
>>>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it
>>>>>>>>>>>> really useful, for
>>>>>>>>>>>> >>> one.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Chet
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection
>>>>>>>>>>>> is returned
>>>>>>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>>>>>>> additional functions
>>>>>>>>>>>> >>> can be applied.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>>>>>>> j...@nanthrax.net>
>>>>>>>>>>>> >>> wrote:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more
>>>>>>>>>>>> than in the
>>>>>>>>>>>> >>>> main.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Regards
>>>>>>>>>>>> >>>> JB
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> I do something almost exactly like this, but with
>>>>>>>>>>>> BigtableIO instead.
>>>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I
>>>>>>>>>>>> need to finish this
>>>>>>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>>>>>>> something like
>>>>>>>>>>>> >>>>> this.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on
>>>>>>>>>>>> the output from
>>>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function
>>>>>>>>>>>> which will run when
>>>>>>>>>>>> >>>>> all writes finish.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> You probably want to avoid doing something in the main
>>>>>>>>>>>> method because
>>>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver
>>>>>>>>>>>> will die, get
>>>>>>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <
>>>>>>>>>>>> nerdyn...@gmail.com
>>>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in
>>>>>>>>>>>> your Main
>>>>>>>>>>>> >>>>> method.
>>>>>>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>     Example:
>>>>>>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>>>>>>> >>>>>     try {
>>>>>>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>>>>>>> >>>>>     //DO ES work
>>>>>>>>>>>> >>>>>     }
>>>>>>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>>>>>>> >>>>>     result.cancel();
>>>>>>>>>>>> >>>>>     throw e;
>>>>>>>>>>>> >>>>>     }
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a
>>>>>>>>>>>> work flow.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>>>>>>> >>>>> <j...@nanthrax.net
>>>>>>>>>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         Hi,
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         We can imagine to have a user callback fn fired
>>>>>>>>>>>> when the sink
>>>>>>>>>>>> >>>>> batch is
>>>>>>>>>>>> >>>>>         complete.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         Let me think about that.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         Regards
>>>>>>>>>>>> >>>>>         JB
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>             Hey JB,
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>>>>>>> monitoring
>>>>>>>>>>>> >>>>> when the ES
>>>>>>>>>>>> >>>>>             transform completes successfully before I can
>>>>>>>>>>>> proceed with
>>>>>>>>>>>> >>>>> doing the
>>>>>>>>>>>> >>>>>             swap.
>>>>>>>>>>>> >>>>>             The problem with this is that I can't think
>>>>>>>>>>>> of a good way
>>>>>>>>>>>> >>>>> to
>>>>>>>>>>>> >>>>>             determine that termination state short of
>>>>>>>>>>>> polling the new
>>>>>>>>>>>> >>>>> index to
>>>>>>>>>>>> >>>>>             check the document count compared to the size
>>>>>>>>>>>> of input
>>>>>>>>>>>> >>>>> PCollection.
>>>>>>>>>>>> >>>>>             That, or maybe I'd need to use an external
>>>>>>>>>>>> system like you
>>>>>>>>>>>> >>>>> mentioned
>>>>>>>>>>>> >>>>>             to poll on the state of the pipeline (I'm
>>>>>>>>>>>> using Google
>>>>>>>>>>>> >>>>> Dataflow, so
>>>>>>>>>>>> >>>>>             maybe there's a way to do this with some API).
>>>>>>>>>>>> >>>>>             But I would have thought that there would be
>>>>>>>>>>>> an easy way of
>>>>>>>>>>>> >>>>> simply
>>>>>>>>>>>> >>>>>             saying "do not process this transform until
>>>>>>>>>>>> this other
>>>>>>>>>>>> >>>>> transform
>>>>>>>>>>>> >>>>>             completes".
>>>>>>>>>>>> >>>>>             Is there no established way of "signaling"
>>>>>>>>>>>> between
>>>>>>>>>>>> >>>>> pipelines when
>>>>>>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>>>>>>> declaring a
>>>>>>>>>>>> >>>>> dependency
>>>>>>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>             Thanks again,
>>>>>>>>>>>> >>>>>             Philip
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM,
>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                  Hi Philip,
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>>>>>>> pipeline as
>>>>>>>>>>>> >>>>> the
>>>>>>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                  So, (3) has to be done in another
>>>>>>>>>>>> pipeline (using a
>>>>>>>>>>>> >>>>> DoFn) or in
>>>>>>>>>>>> >>>>>             another
>>>>>>>>>>>> >>>>>                  "system" (like Camel for instance). I
>>>>>>>>>>>> would do a check
>>>>>>>>>>>> >>>>> of the
>>>>>>>>>>>> >>>>>             data in the
>>>>>>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                  Regards
>>>>>>>>>>>> >>>>>                  JB
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                      Hi,
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                      I'm pretty new to Beam, and I've
>>>>>>>>>>>> been trying to
>>>>>>>>>>>> >>>>> use the
>>>>>>>>>>>> >>>>>             ElasticSearchIO
>>>>>>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>>>>>>> >>>>>                      1. ingest and transform rows from DB
>>>>>>>>>>>> (done)
>>>>>>>>>>>> >>>>>                      2. write JSON docs/strings into a
>>>>>>>>>>>> new ES index
>>>>>>>>>>>> >>>>> (done)
>>>>>>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>>>>>>> documents are
>>>>>>>>>>>> >>>>> written into
>>>>>>>>>>>> >>>>>             a new index,
>>>>>>>>>>>> >>>>>                      trigger an atomic index swap under
>>>>>>>>>>>> an alias to
>>>>>>>>>>>> >>>>> replace the
>>>>>>>>>>>> >>>>>             current
>>>>>>>>>>>> >>>>>                      aliased index with the new index
>>>>>>>>>>>> generated in step
>>>>>>>>>>>> >>>>> 2. This
>>>>>>>>>>>> >>>>>             is basically
>>>>>>>>>>>> >>>>>                      a single POST request to the ES
>>>>>>>>>>>> cluster.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                      The problem I'm facing is that I
>>>>>>>>>>>> don't seem to be
>>>>>>>>>>>> >>>>> able to
>>>>>>>>>>>> >>>>>             find a way to
>>>>>>>>>>>> >>>>>                      have a way for (3) to happen after
>>>>>>>>>>>> step (2) is
>>>>>>>>>>>> >>>>> complete.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>>>>>>> returns a
>>>>>>>>>>>> >>>>> PDone, and
>>>>>>>>>>>> >>>>>             I'm not sure
>>>>>>>>>>>> >>>>>                      how to proceed from there because it
>>>>>>>>>>>> doesn't seem
>>>>>>>>>>>> >>>>> to let me
>>>>>>>>>>>> >>>>>             do another
>>>>>>>>>>>> >>>>>                      apply on it to "define" a dependency.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.
>>>>>>>>>>>> Write.html>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                      Is there a recommended way to
>>>>>>>>>>>> construct pipelines
>>>>>>>>>>>> >>>>> workflows
>>>>>>>>>>>> >>>>>             like this?
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                      Thanks in advance,
>>>>>>>>>>>> >>>>>                      Philip
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>>>>>>> >>>>>             jbono...@apache.org <mailto:
>>>>>>>>>>>> jbono...@apache.org>
>>>>>>>>>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>>>>>>>>>> jbono...@apache.org>>
>>>>>>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>>>>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> --
>>>>>>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>>>>>>> >>>> jbono...@apache.org
>>>>>>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>
>>

Reply via email to