Yeah. And I don't think there's a good way to define what sequencing even
means, if the sink is returning results in windows that aren't gonna have a
final pane.

On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote:

> This is only for "final pane" waiting, correct? So someone who writes a
> sink in the global window probably would not want to use this.
>
> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> PR is out https://github.com/apache/beam/pull/4301
>>
>> This should allow us to have useful sequencing for sinks like BigtableIO
>> / BigQueryIO.
>>
>> Adding a couple of interested parties:
>> - Steve, would you be interested in using this in
>> https://github.com/apache/beam/pull/3997 ?
>> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
>> in particular, this works properly in case the input can be firing multiple
>> times.
>>
>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> I figured out the Never.ever() approach and it seems to work. Will
>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems like
>>> this will be quite a useful transform.
>>>
>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> I'm a bit confused by all of these suggestions: they sound plausible at
>>>> a high level, but I'm having a hard time making any one of them concrete.
>>>>
>>>> So suppose we want to create a transform Wait.on(PCollection<?>
>>>> signal): PCollection<T> -> PCollection<T>.
>>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
>>>> "a", but buffers panes of "a" in any given window until the final pane of
>>>> "sig" in the same window is fired (or, if it's never fired, until the
>>>> window closes? could use a deadletter for that maybe).
>>>>
>>>> This transform I suppose would need to have a keyed and unkeyed version.
>>>>
>>>> The keyed version would support merging window fns, and would require
>>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk -
>>>> followed by a stateful ParDo? Or is there a way to get away without a
>>>> stateful ParDo here? (not all runners support it)
>>>>
>>>> The unkeyed version would not support merging window fns. Reuven, can
>>>> you elaborate how your combiner idea would work here - in particular, what
>>>> do you mean by "triggering only on the final pane"? Do you mean filter
>>>> non-final panes before entering the combiner? I wonder if that'll work,
>>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on
>>>> the side input with a Never.ever() trigger"?
>>>>
>>>> Thanks.
>>>>
>>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> This is an interesting point.
>>>>>
>>>>> In the past, we've often just though about sequencing some action to
>>>>> take place after the sink, in which case you can simply use the sink 
>>>>> output
>>>>> as a main input. However if you want to run a transform with another
>>>>> PCollection as a main input, this doesn't work. And as you've discovered,
>>>>> triggered side inputs are defined to be non-deterministic, and there's no
>>>>> way to make things line up.
>>>>>
>>>>> What you're describing only makes sense if you're blocking against the
>>>>> final pane (since otherwise there's no reasonable way to match up somePC
>>>>> panes with the sink panes). There are multiple ways you can do this: one
>>>>> would be to CGBK the two PCollections together, and trigger the new
>>>>> transform only on the final pane. Another would be to add a combiner that
>>>>> returns a Void, triggering only on the final pane, and then make this
>>>>> singleton Void a side input. You could also do something explicit with the
>>>>> state API.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <
>>>>> kirpic...@google.com> wrote:
>>>>>
>>>>>> So this appears not as easy as anticipated (surprise!)
>>>>>>
>>>>>> Suppose we have a PCollection "donePanes" with an element per
>>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>>>>> data has been written; this pane is: final / non-final".
>>>>>>
>>>>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>>>>>> happens only after the final pane has been written.
>>>>>>
>>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to
>>>>>> happen when c emits a *final* pane.
>>>>>>
>>>>>> Unfortunately, using
>>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't 
>>>>>> do
>>>>>> the trick: the side input becomes ready the moment *the first *pane
>>>>>> of data has been written.
>>>>>>
>>>>>> But neither does
>>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(...filter only final
>>>>>> panes...).apply(View.asSingleton())). It also becomes ready the moment 
>>>>>> *the
>>>>>> first* pane has been written, you just get an exception if you
>>>>>> access the side input before the *final* pane was written.
>>>>>>
>>>>>> I can't think of a pure-Beam solution to this: either "donePanes"
>>>>>> will be used as a main input to something (and then everything else can
>>>>>> only be a side input, which is not general enough), or it will be used 
>>>>>> as a
>>>>>> side input (and then we can't achieve "trigger only after the final pane
>>>>>> fires").
>>>>>>
>>>>>> It seems that we need a way to control the side input pushback, and
>>>>>> configure whether a view becomes ready when its first pane has fired or
>>>>>> when its last pane has fired. I could see this be a property on the View
>>>>>> transform itself. In terms of implementation - I tried to figure out how
>>>>>> side input readiness is determined, in the direct runner and Dataflow
>>>>>> runner, and I'm completely lost and would appreciate some help.
>>>>>>
>>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> This sounds great!
>>>>>>>
>>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This would be absolutely great! It seems somewhat similar to the
>>>>>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>>>>>>>> ).
>>>>>>>>
>>>>>>>> I find it helpful to think about the different things that may come
>>>>>>>> after a sink. For instance:
>>>>>>>>
>>>>>>>> 1. It might be helpful to have a collection of failing input
>>>>>>>> elements. The type of failed elements is pretty straightforward -- 
>>>>>>>> just the
>>>>>>>> input elements. This allows handling such failures by directing them
>>>>>>>> elsewhere or performing additional processing.
>>>>>>>>
>>>>>>>
>>>>>>> BigQueryIO already does this as you point out.
>>>>>>>
>>>>>>>>
>>>>>>>> 2. For a sink that produces a series of files, it might be useful
>>>>>>>> to have a collection of the file names that have been completely 
>>>>>>>> written.
>>>>>>>> This allows performing additional handling on these completed segments.
>>>>>>>>
>>>>>>>
>>>>>>> In fact we already do this for FileBasedSinks.   See
>>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>>>>>>>
>>>>>>>>
>>>>>>>> 3. For a sink that updates some destination, it would be reasonable
>>>>>>>> to have a collection that provides (periodically) output indicating how
>>>>>>>> complete the information written to that destination is. For instance, 
>>>>>>>> this
>>>>>>>> might be something like "<this bigquery table> has all of the elements 
>>>>>>>> up
>>>>>>>> to <input watermark>" complete. This allows tracking how much 
>>>>>>>> information
>>>>>>>> has been completely written out.
>>>>>>>>
>>>>>>>
>>>>>>> Interesting. Maybe tough to do since sinks often don't have that
>>>>>>> knowledge.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> I think those concepts map to the more detailed description Eugene
>>>>>>>> provided, but I find it helpful to focus on what information comes out 
>>>>>>>> of
>>>>>>>> the sink and how it might be used.
>>>>>>>>
>>>>>>>> Were there any use cases the above miss? Any functionality that has
>>>>>>>> been described that doesn't map to these use cases?
>>>>>>>>
>>>>>>>> -- Ben
>>>>>>>>
>>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>> It makes sense to consider how this maps onto existing kinds of
>>>>>>>>> sinks.
>>>>>>>>>
>>>>>>>>> E.g.:
>>>>>>>>> - Something that just makes an RPC per record, e.g.
>>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus 
>>>>>>>>> value or
>>>>>>>>> number of records written) that will be Combine'd into 1 result per 
>>>>>>>>> pane of
>>>>>>>>> input. A user can sequence against this and be notified when some
>>>>>>>>> intermediate amount of data has been written for a window, or (via
>>>>>>>>> .isFinal()) when all of it has been written.
>>>>>>>>> - Something that e.g. initiates an import job, such as
>>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up 
>>>>>>>>> atomic
>>>>>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>>>>>> information about the job (e.g. its id and statistics). Role of panes 
>>>>>>>>> is
>>>>>>>>> the same.
>>>>>>>>> - Something like above but that supports dynamic destinations:
>>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, 
>>>>>>>>> ResultT>>
>>>>>>>>> where ResultT may be something like a list of files that were written 
>>>>>>>>> for
>>>>>>>>> this pane of this destination.
>>>>>>>>>
>>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> I agree that the proper API for enabling the use case "do
>>>>>>>>>> something after the data has been written" is to return a 
>>>>>>>>>> PCollection of
>>>>>>>>>> objects where each object represents the result of writing some
>>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this
>>>>>>>>>> PCollection, in order to "do something after this subset has been 
>>>>>>>>>> written".
>>>>>>>>>>
>>>>>>>>>> The challenging part here is *identifying* the subset of the data
>>>>>>>>>> that's been written, in a way consistent with Beam's unified
>>>>>>>>>> batch/streaming model, where saying "all data has been written" is 
>>>>>>>>>> not an
>>>>>>>>>> option because more data can arrive.
>>>>>>>>>>
>>>>>>>>>> The next choice is "a window of input has been written", but then
>>>>>>>>>> again, late data can arrive into a window as well.
>>>>>>>>>>
>>>>>>>>>> Next choice after that is "a pane of input has been written", but
>>>>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of
>>>>>>>>>> input" is moot: triggering and panes should be something private to 
>>>>>>>>>> the
>>>>>>>>>> sink, and the same input can trigger different sinks differently. The
>>>>>>>>>> hypothetical different accumulation modes make this trickier still. 
>>>>>>>>>> I'm not
>>>>>>>>>> sure whether we intend to also challenge the idea that windowing is
>>>>>>>>>> inherent to the collection, or whether it too should be specified on 
>>>>>>>>>> a
>>>>>>>>>> transform that processes the collection. I think for the sake of this
>>>>>>>>>> discussion we can assume that it's inherent, and assume the mental 
>>>>>>>>>> model
>>>>>>>>>> that the elements in different windows of a PCollection are processed
>>>>>>>>>> independently - "as if" there were multiple pipelines processing each
>>>>>>>>>> window.
>>>>>>>>>>
>>>>>>>>>> Overall, embracing the full picture, we end up with something
>>>>>>>>>> like this:
>>>>>>>>>> - The input PCollection is a composition of windows.
>>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>>>>>>>>>> windows), the below applies to the entire contents of the 
>>>>>>>>>> PCollection. If
>>>>>>>>>> it's merging (e.g. session windows), then it applies per-key, and 
>>>>>>>>>> the input
>>>>>>>>>> should be (perhaps implicitly) keyed in a way that the sink 
>>>>>>>>>> understands -
>>>>>>>>>> for example, the grouping by destination in DynamicDestinations in 
>>>>>>>>>> file and
>>>>>>>>>> bigquery writes.
>>>>>>>>>> - Each window's contents is a "changelog" - stream of elements
>>>>>>>>>> and retractions.
>>>>>>>>>> - A "sink" processes each window of the collection, deciding how
>>>>>>>>>> to handle elements and retractions (and whether to support 
>>>>>>>>>> retractions at
>>>>>>>>>> all) in a sink-specific way, and deciding *when* to perform the side
>>>>>>>>>> effects for a portion of the changelog (a "pane") based on the sink's
>>>>>>>>>> triggering strategy.
>>>>>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list
>>>>>>>>>> of filenames that have been written, or simply a number of records 
>>>>>>>>>> that was
>>>>>>>>>> written, or a bogus void value etc. The result will implicitly 
>>>>>>>>>> include the
>>>>>>>>>> window of the input it's associated with. It will also implicitly 
>>>>>>>>>> include
>>>>>>>>>> pane information - index of the pane in this window, and whether 
>>>>>>>>>> this is
>>>>>>>>>> the first or last pane.
>>>>>>>>>> - The partitioning into bundles is an implementation detail and
>>>>>>>>>> not very useful, so before presenting the pane write results to the 
>>>>>>>>>> user,
>>>>>>>>>> the sink will probably want to Combine the bundle results so that 
>>>>>>>>>> there
>>>>>>>>>> ends up being 1 value for each pane that was written. Once again 
>>>>>>>>>> note that
>>>>>>>>>> panes may be associated with windows of the input as a whole, but if 
>>>>>>>>>> the
>>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be associated 
>>>>>>>>>> with
>>>>>>>>>> per-key subsets of windows of the input.
>>>>>>>>>> - This combining requires an extra, well, combining operation, so
>>>>>>>>>> it should be optional.
>>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or a
>>>>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, 
>>>>>>>>>> where
>>>>>>>>>> the elements of this collection will implicitly have window and pane
>>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace
>>>>>>>>>> the fact that trigger strategy is set on the input. But in that case 
>>>>>>>>>> the
>>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not 
>>>>>>>>>> necessarily
>>>>>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>>>>>> internal
>>>>>>>>>> aggregation as an implementation detail, which may modify the 
>>>>>>>>>> triggering
>>>>>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>>>>>> - In most cases, one may imagine that the user is interested in
>>>>>>>>>> being notified of "no more data associated with this window will be
>>>>>>>>>> written", so the user will ignore all ResultT's except those where 
>>>>>>>>>> the pane
>>>>>>>>>> is marked final. If a user is interested in being notified of 
>>>>>>>>>> intermediate
>>>>>>>>>> write results - they'll have to embrace the fact that they cannot 
>>>>>>>>>> identify
>>>>>>>>>> the precise subset of input associated with the intermediate result.
>>>>>>>>>>
>>>>>>>>>> I think the really key points of the above are:
>>>>>>>>>> - Sinks should support windowed input. Sinks should write
>>>>>>>>>> different windows of input independently. If the sink can write
>>>>>>>>>> multi-destination input, the destination should function as a 
>>>>>>>>>> grouping key,
>>>>>>>>>> and in that case merging windowing should be allowed.
>>>>>>>>>> - Producing a PCollection of write results should be optional.
>>>>>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in 
>>>>>>>>>> the
>>>>>>>>>> window of the input that was written, and have a PaneInfo assigned 
>>>>>>>>>> by the
>>>>>>>>>> sink, of which probably the only part useful to the user is whether 
>>>>>>>>>> it's
>>>>>>>>>> .isFinal().
>>>>>>>>>>
>>>>>>>>>> Does this sound reasonable?
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> +1
>>>>>>>>>>>
>>>>>>>>>>> At the very least an empty PCollection<?> could be produced with
>>>>>>>>>>> no
>>>>>>>>>>> promises about its contents but the ability to be followed (e.g.
>>>>>>>>>>> as a
>>>>>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>>>>>> metadata
>>>>>>>>>>> one may decide to produce in the future.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> > +dev@
>>>>>>>>>>> >
>>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are
>>>>>>>>>>> easy to
>>>>>>>>>>> > understand and a good way for an IO to communicate and
>>>>>>>>>>> establish causal
>>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may
>>>>>>>>>>> spur further
>>>>>>>>>>> > useful thoughts based on the design decisions about what sort
>>>>>>>>>>> of output is
>>>>>>>>>>> > most useful.
>>>>>>>>>>> >
>>>>>>>>>>> > Kenn
>>>>>>>>>>> >
>>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> I think all sinks actually do have valuable information to
>>>>>>>>>>> output which
>>>>>>>>>>> >> can be used after a write (file names, transaction/commit/row
>>>>>>>>>>> ids, table
>>>>>>>>>>> >> names, ...). In addition to this metadata, having a
>>>>>>>>>>> PCollection of all
>>>>>>>>>>> >> successful writes and all failed writes is useful for users
>>>>>>>>>>> so they can
>>>>>>>>>>> >> chain an action which depends on what was or wasn't
>>>>>>>>>>> successfully written.
>>>>>>>>>>> >> Users have requested adding retry/failure handling policies
>>>>>>>>>>> to sinks so that
>>>>>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>>>>>> chet.aldr...@postmates.com>
>>>>>>>>>>> >> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> So I agree generally with the idea that returning a
>>>>>>>>>>> PCollection makes all
>>>>>>>>>>> >>> of this easier so that arbitrary additional functions can be
>>>>>>>>>>> added, what
>>>>>>>>>>> >>> exactly would write functions be returning in a PCollection
>>>>>>>>>>> that would make
>>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>>>>>> source and now
>>>>>>>>>>> >>> the collection itself is no longer needed.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently
>>>>>>>>>>> that doesn’t
>>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible
>>>>>>>>>>> ways of handling
>>>>>>>>>>> >>> this given this conversation, and am curious which solution
>>>>>>>>>>> sounds like the
>>>>>>>>>>> >>> best way to deal with the problem:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>>>>>> (which would
>>>>>>>>>>> >>> be the same across transforms by convention), that is in the
>>>>>>>>>>> form of a
>>>>>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>>>>>> PCollection so we
>>>>>>>>>>> >>> can run applies afterward.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> 3. Make output transforms provide the facility for a
>>>>>>>>>>> callback function
>>>>>>>>>>> >>> which runs after the transform is complete.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> I went through these gymnastics recently when I was trying
>>>>>>>>>>> to build
>>>>>>>>>>> >>> something that would move indices after writing to Algolia,
>>>>>>>>>>> and the solution
>>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to
>>>>>>>>>>> exist in Beam. The
>>>>>>>>>>> >>> problem is that particular method requires the output
>>>>>>>>>>> transform in question
>>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t
>>>>>>>>>>> make sense to
>>>>>>>>>>> >>> return one. This seems like a bad solution, but
>>>>>>>>>>> unfortunately there isn’t a
>>>>>>>>>>> >>> notion of a transform that has no explicit output that needs
>>>>>>>>>>> to have
>>>>>>>>>>> >>> operations occur after it.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> The three potential solutions above address this issue, but
>>>>>>>>>>> I would like
>>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a different
>>>>>>>>>>> proposal
>>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on
>>>>>>>>>>> this, since it
>>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it
>>>>>>>>>>> really useful, for
>>>>>>>>>>> >>> one.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Chet
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection
>>>>>>>>>>> is returned
>>>>>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>>>>>> additional functions
>>>>>>>>>>> >>> can be applied.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>>>>>> j...@nanthrax.net>
>>>>>>>>>>> >>> wrote:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more
>>>>>>>>>>> than in the
>>>>>>>>>>> >>>> main.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Regards
>>>>>>>>>>> >>>> JB
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> I do something almost exactly like this, but with
>>>>>>>>>>> BigtableIO instead.
>>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I
>>>>>>>>>>> need to finish this
>>>>>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>>>>>> something like
>>>>>>>>>>> >>>>> this.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the
>>>>>>>>>>> output from
>>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function
>>>>>>>>>>> which will run when
>>>>>>>>>>> >>>>> all writes finish.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> You probably want to avoid doing something in the main
>>>>>>>>>>> method because
>>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver
>>>>>>>>>>> will die, get
>>>>>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <
>>>>>>>>>>> nerdyn...@gmail.com
>>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in
>>>>>>>>>>> your Main
>>>>>>>>>>> >>>>> method.
>>>>>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>     Example:
>>>>>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>>>>>> >>>>>     try {
>>>>>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>>>>>> >>>>>     //DO ES work
>>>>>>>>>>> >>>>>     }
>>>>>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>>>>>> >>>>>     result.cancel();
>>>>>>>>>>> >>>>>     throw e;
>>>>>>>>>>> >>>>>     }
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a work
>>>>>>>>>>> flow.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>>>>>> >>>>> <j...@nanthrax.net
>>>>>>>>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         Hi,
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         We can imagine to have a user callback fn fired
>>>>>>>>>>> when the sink
>>>>>>>>>>> >>>>> batch is
>>>>>>>>>>> >>>>>         complete.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         Let me think about that.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         Regards
>>>>>>>>>>> >>>>>         JB
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>             Hey JB,
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>>>>>> monitoring
>>>>>>>>>>> >>>>> when the ES
>>>>>>>>>>> >>>>>             transform completes successfully before I can
>>>>>>>>>>> proceed with
>>>>>>>>>>> >>>>> doing the
>>>>>>>>>>> >>>>>             swap.
>>>>>>>>>>> >>>>>             The problem with this is that I can't think of
>>>>>>>>>>> a good way
>>>>>>>>>>> >>>>> to
>>>>>>>>>>> >>>>>             determine that termination state short of
>>>>>>>>>>> polling the new
>>>>>>>>>>> >>>>> index to
>>>>>>>>>>> >>>>>             check the document count compared to the size
>>>>>>>>>>> of input
>>>>>>>>>>> >>>>> PCollection.
>>>>>>>>>>> >>>>>             That, or maybe I'd need to use an external
>>>>>>>>>>> system like you
>>>>>>>>>>> >>>>> mentioned
>>>>>>>>>>> >>>>>             to poll on the state of the pipeline (I'm
>>>>>>>>>>> using Google
>>>>>>>>>>> >>>>> Dataflow, so
>>>>>>>>>>> >>>>>             maybe there's a way to do this with some API).
>>>>>>>>>>> >>>>>             But I would have thought that there would be
>>>>>>>>>>> an easy way of
>>>>>>>>>>> >>>>> simply
>>>>>>>>>>> >>>>>             saying "do not process this transform until
>>>>>>>>>>> this other
>>>>>>>>>>> >>>>> transform
>>>>>>>>>>> >>>>>             completes".
>>>>>>>>>>> >>>>>             Is there no established way of "signaling"
>>>>>>>>>>> between
>>>>>>>>>>> >>>>> pipelines when
>>>>>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>>>>>> declaring a
>>>>>>>>>>> >>>>> dependency
>>>>>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>             Thanks again,
>>>>>>>>>>> >>>>>             Philip
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM,
>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                  Hi Philip,
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>>>>>> pipeline as
>>>>>>>>>>> >>>>> the
>>>>>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                  So, (3) has to be done in another
>>>>>>>>>>> pipeline (using a
>>>>>>>>>>> >>>>> DoFn) or in
>>>>>>>>>>> >>>>>             another
>>>>>>>>>>> >>>>>                  "system" (like Camel for instance). I
>>>>>>>>>>> would do a check
>>>>>>>>>>> >>>>> of the
>>>>>>>>>>> >>>>>             data in the
>>>>>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                  Regards
>>>>>>>>>>> >>>>>                  JB
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                      Hi,
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                      I'm pretty new to Beam, and I've been
>>>>>>>>>>> trying to
>>>>>>>>>>> >>>>> use the
>>>>>>>>>>> >>>>>             ElasticSearchIO
>>>>>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>>>>>> >>>>>                      1. ingest and transform rows from DB
>>>>>>>>>>> (done)
>>>>>>>>>>> >>>>>                      2. write JSON docs/strings into a new
>>>>>>>>>>> ES index
>>>>>>>>>>> >>>>> (done)
>>>>>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>>>>>> documents are
>>>>>>>>>>> >>>>> written into
>>>>>>>>>>> >>>>>             a new index,
>>>>>>>>>>> >>>>>                      trigger an atomic index swap under an
>>>>>>>>>>> alias to
>>>>>>>>>>> >>>>> replace the
>>>>>>>>>>> >>>>>             current
>>>>>>>>>>> >>>>>                      aliased index with the new index
>>>>>>>>>>> generated in step
>>>>>>>>>>> >>>>> 2. This
>>>>>>>>>>> >>>>>             is basically
>>>>>>>>>>> >>>>>                      a single POST request to the ES
>>>>>>>>>>> cluster.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                      The problem I'm facing is that I
>>>>>>>>>>> don't seem to be
>>>>>>>>>>> >>>>> able to
>>>>>>>>>>> >>>>>             find a way to
>>>>>>>>>>> >>>>>                      have a way for (3) to happen after
>>>>>>>>>>> step (2) is
>>>>>>>>>>> >>>>> complete.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>>>>>> returns a
>>>>>>>>>>> >>>>> PDone, and
>>>>>>>>>>> >>>>>             I'm not sure
>>>>>>>>>>> >>>>>                      how to proceed from there because it
>>>>>>>>>>> doesn't seem
>>>>>>>>>>> >>>>> to let me
>>>>>>>>>>> >>>>>             do another
>>>>>>>>>>> >>>>>                      apply on it to "define" a dependency.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> <
>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                      Is there a recommended way to
>>>>>>>>>>> construct pipelines
>>>>>>>>>>> >>>>> workflows
>>>>>>>>>>> >>>>>             like this?
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                      Thanks in advance,
>>>>>>>>>>> >>>>>                      Philip
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>>>>>> >>>>>             jbono...@apache.org <mailto:
>>>>>>>>>>> jbono...@apache.org>
>>>>>>>>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>>>>>>>>> jbono...@apache.org>>
>>>>>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>>>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> --
>>>>>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>>>>>> >>>> jbono...@apache.org
>>>>>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>
>>>>>
>

Reply via email to