I'm a bit confused by all of these suggestions: they sound plausible at a
high level, but I'm having a hard time making any one of them concrete.

So suppose we want to create a transform Wait.on(PCollection<?> signal):
PCollection<T> -> PCollection<T>.
a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
"a", but buffers panes of "a" in any given window until the final pane of
"sig" in the same window is fired (or, if it's never fired, until the
window closes? could use a deadletter for that maybe).

This transform I suppose would need to have a keyed and unkeyed version.

The keyed version would support merging window fns, and would require "a"
and "sig" to be keyed by the same key, and would work using a CoGbk -
followed by a stateful ParDo? Or is there a way to get away without a
stateful ParDo here? (not all runners support it)

The unkeyed version would not support merging window fns. Reuven, can you
elaborate how your combiner idea would work here - in particular, what do
you mean by "triggering only on the final pane"? Do you mean filter
non-final panes before entering the combiner? I wonder if that'll work,
probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
side input with a Never.ever() trigger"?

Thanks.

On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote:

> This is an interesting point.
>
> In the past, we've often just though about sequencing some action to take
> place after the sink, in which case you can simply use the sink output as a
> main input. However if you want to run a transform with another PCollection
> as a main input, this doesn't work. And as you've discovered, triggered
> side inputs are defined to be non-deterministic, and there's no way to make
> things line up.
>
> What you're describing only makes sense if you're blocking against the
> final pane (since otherwise there's no reasonable way to match up somePC
> panes with the sink panes). There are multiple ways you can do this: one
> would be to CGBK the two PCollections together, and trigger the new
> transform only on the final pane. Another would be to add a combiner that
> returns a Void, triggering only on the final pane, and then make this
> singleton Void a side input. You could also do something explicit with the
> state API.
>
> Reuven
>
> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> So this appears not as easy as anticipated (surprise!)
>>
>> Suppose we have a PCollection "donePanes" with an element per
>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>> data has been written; this pane is: final / non-final".
>>
>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>> happens only after the final pane has been written.
>>
>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>> when c emits a *final* pane.
>>
>> Unfortunately, using
>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
>> the trick: the side input becomes ready the moment *the first *pane of
>> data has been written.
>>
>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>> only final panes...).apply(View.asSingleton())). It also becomes ready the
>> moment *the first* pane has been written, you just get an exception if
>> you access the side input before the *final* pane was written.
>>
>> I can't think of a pure-Beam solution to this: either "donePanes" will be
>> used as a main input to something (and then everything else can only be a
>> side input, which is not general enough), or it will be used as a side
>> input (and then we can't achieve "trigger only after the final pane fires").
>>
>> It seems that we need a way to control the side input pushback, and
>> configure whether a view becomes ready when its first pane has fired or
>> when its last pane has fired. I could see this be a property on the View
>> transform itself. In terms of implementation - I tried to figure out how
>> side input readiness is determined, in the direct runner and Dataflow
>> runner, and I'm completely lost and would appreciate some help.
>>
>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote:
>>
>>> This sounds great!
>>>
>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org>
>>> wrote:
>>>
>>>> This would be absolutely great! It seems somewhat similar to the
>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>>>> ).
>>>>
>>>> I find it helpful to think about the different things that may come
>>>> after a sink. For instance:
>>>>
>>>> 1. It might be helpful to have a collection of failing input elements.
>>>> The type of failed elements is pretty straightforward -- just the input
>>>> elements. This allows handling such failures by directing them elsewhere or
>>>> performing additional processing.
>>>>
>>>
>>> BigQueryIO already does this as you point out.
>>>
>>>>
>>>> 2. For a sink that produces a series of files, it might be useful to
>>>> have a collection of the file names that have been completely written. This
>>>> allows performing additional handling on these completed segments.
>>>>
>>>
>>> In fact we already do this for FileBasedSinks.   See
>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>>>
>>>>
>>>> 3. For a sink that updates some destination, it would be reasonable to
>>>> have a collection that provides (periodically) output indicating how
>>>> complete the information written to that destination is. For instance, this
>>>> might be something like "<this bigquery table> has all of the elements up
>>>> to <input watermark>" complete. This allows tracking how much information
>>>> has been completely written out.
>>>>
>>>
>>> Interesting. Maybe tough to do since sinks often don't have that
>>> knowledge.
>>>
>>>
>>>>
>>>> I think those concepts map to the more detailed description Eugene
>>>> provided, but I find it helpful to focus on what information comes out of
>>>> the sink and how it might be used.
>>>>
>>>> Were there any use cases the above miss? Any functionality that has
>>>> been described that doesn't map to these use cases?
>>>>
>>>> -- Ben
>>>>
>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> It makes sense to consider how this maps onto existing kinds of sinks.
>>>>>
>>>>> E.g.:
>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write():
>>>>> that will emit 1 result per bundle (either a bogus value or number of
>>>>> records written) that will be Combine'd into 1 result per pane of input. A
>>>>> user can sequence against this and be notified when some intermediate
>>>>> amount of data has been written for a window, or (via .isFinal()) when all
>>>>> of it has been written.
>>>>> - Something that e.g. initiates an import job, such as
>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic
>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>> information about the job (e.g. its id and statistics). Role of panes is
>>>>> the same.
>>>>> - Something like above but that supports dynamic destinations: like in
>>>>> WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> where
>>>>> ResultT may be something like a list of files that were written for this
>>>>> pane of this destination.
>>>>>
>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I agree that the proper API for enabling the use case "do something
>>>>>> after the data has been written" is to return a PCollection of objects
>>>>>> where each object represents the result of writing some identifiable 
>>>>>> subset
>>>>>> of the data. Then one can apply a ParDo to this PCollection, in order to
>>>>>> "do something after this subset has been written".
>>>>>>
>>>>>> The challenging part here is *identifying* the subset of the data
>>>>>> that's been written, in a way consistent with Beam's unified
>>>>>> batch/streaming model, where saying "all data has been written" is not an
>>>>>> option because more data can arrive.
>>>>>>
>>>>>> The next choice is "a window of input has been written", but then
>>>>>> again, late data can arrive into a window as well.
>>>>>>
>>>>>> Next choice after that is "a pane of input has been written", but per
>>>>>> https://s.apache.org/beam-sink-triggers the term "pane of input" is
>>>>>> moot: triggering and panes should be something private to the sink, and 
>>>>>> the
>>>>>> same input can trigger different sinks differently. The hypothetical
>>>>>> different accumulation modes make this trickier still. I'm not sure 
>>>>>> whether
>>>>>> we intend to also challenge the idea that windowing is inherent to the
>>>>>> collection, or whether it too should be specified on a transform that
>>>>>> processes the collection. I think for the sake of this discussion we can
>>>>>> assume that it's inherent, and assume the mental model that the elements 
>>>>>> in
>>>>>> different windows of a PCollection are processed independently - "as if"
>>>>>> there were multiple pipelines processing each window.
>>>>>>
>>>>>> Overall, embracing the full picture, we end up with something like
>>>>>> this:
>>>>>> - The input PCollection is a composition of windows.
>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>>>>>> windows), the below applies to the entire contents of the PCollection. If
>>>>>> it's merging (e.g. session windows), then it applies per-key, and the 
>>>>>> input
>>>>>> should be (perhaps implicitly) keyed in a way that the sink understands -
>>>>>> for example, the grouping by destination in DynamicDestinations in file 
>>>>>> and
>>>>>> bigquery writes.
>>>>>> - Each window's contents is a "changelog" - stream of elements and
>>>>>> retractions.
>>>>>> - A "sink" processes each window of the collection, deciding how to
>>>>>> handle elements and retractions (and whether to support retractions at 
>>>>>> all)
>>>>>> in a sink-specific way, and deciding *when* to perform the side effects 
>>>>>> for
>>>>>> a portion of the changelog (a "pane") based on the sink's triggering
>>>>>> strategy.
>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list of
>>>>>> filenames that have been written, or simply a number of records that was
>>>>>> written, or a bogus void value etc. The result will implicitly include 
>>>>>> the
>>>>>> window of the input it's associated with. It will also implicitly include
>>>>>> pane information - index of the pane in this window, and whether this is
>>>>>> the first or last pane.
>>>>>> - The partitioning into bundles is an implementation detail and not
>>>>>> very useful, so before presenting the pane write results to the user, the
>>>>>> sink will probably want to Combine the bundle results so that there ends 
>>>>>> up
>>>>>> being 1 value for each pane that was written. Once again note that panes
>>>>>> may be associated with windows of the input as a whole, but if the input 
>>>>>> is
>>>>>> keyed (like with DynamicDestinations) they'll be associated with per-key
>>>>>> subsets of windows of the input.
>>>>>> - This combining requires an extra, well, combining operation, so it
>>>>>> should be optional.
>>>>>> - The user will end up getting either a PCollection<ResultT> or a
>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where
>>>>>> the elements of this collection will implicitly have window and pane
>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>> - Until "sink triggering" is implemented, we'll have to embrace the
>>>>>> fact that trigger strategy is set on the input. But in that case the user
>>>>>> will have to accept that the PaneInfo of ResultT's is not necessarily
>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>> internal
>>>>>> aggregation as an implementation detail, which may modify the triggering
>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>> - In most cases, one may imagine that the user is interested in being
>>>>>> notified of "no more data associated with this window will be written", 
>>>>>> so
>>>>>> the user will ignore all ResultT's except those where the pane is marked
>>>>>> final. If a user is interested in being notified of intermediate write
>>>>>> results - they'll have to embrace the fact that they cannot identify the
>>>>>> precise subset of input associated with the intermediate result.
>>>>>>
>>>>>> I think the really key points of the above are:
>>>>>> - Sinks should support windowed input. Sinks should write different
>>>>>> windows of input independently. If the sink can write multi-destination
>>>>>> input, the destination should function as a grouping key, and in that 
>>>>>> case
>>>>>> merging windowing should be allowed.
>>>>>> - Producing a PCollection of write results should be optional.
>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>> results that may be keyed or unkeyed (per above), and are placed in the
>>>>>> window of the input that was written, and have a PaneInfo assigned by the
>>>>>> sink, of which probably the only part useful to the user is whether it's
>>>>>> .isFinal().
>>>>>>
>>>>>> Does this sound reasonable?
>>>>>>
>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> +1
>>>>>>>
>>>>>>> At the very least an empty PCollection<?> could be produced with no
>>>>>>> promises about its contents but the ability to be followed (e.g. as a
>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>> metadata
>>>>>>> one may decide to produce in the future.
>>>>>>>
>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com>
>>>>>>> wrote:
>>>>>>> > +dev@
>>>>>>> >
>>>>>>> > I am in complete agreement with Luke. Data dependencies are easy to
>>>>>>> > understand and a good way for an IO to communicate and establish
>>>>>>> causal
>>>>>>> > dependencies. Converting an IO from PDone to real output may spur
>>>>>>> further
>>>>>>> > useful thoughts based on the design decisions about what sort of
>>>>>>> output is
>>>>>>> > most useful.
>>>>>>> >
>>>>>>> > Kenn
>>>>>>> >
>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> I think all sinks actually do have valuable information to output
>>>>>>> which
>>>>>>> >> can be used after a write (file names, transaction/commit/row
>>>>>>> ids, table
>>>>>>> >> names, ...). In addition to this metadata, having a PCollection
>>>>>>> of all
>>>>>>> >> successful writes and all failed writes is useful for users so
>>>>>>> they can
>>>>>>> >> chain an action which depends on what was or wasn't successfully
>>>>>>> written.
>>>>>>> >> Users have requested adding retry/failure handling policies to
>>>>>>> sinks so that
>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>> >>
>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>> chet.aldr...@postmates.com>
>>>>>>> >> wrote:
>>>>>>> >>>
>>>>>>> >>> So I agree generally with the idea that returning a PCollection
>>>>>>> makes all
>>>>>>> >>> of this easier so that arbitrary additional functions can be
>>>>>>> added, what
>>>>>>> >>> exactly would write functions be returning in a PCollection that
>>>>>>> would make
>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>> source and now
>>>>>>> >>> the collection itself is no longer needed.
>>>>>>> >>>
>>>>>>> >>> Currently, that’s represented with a PDone, but currently that
>>>>>>> doesn’t
>>>>>>> >>> allow any work to occur after it. I see a couple possible ways
>>>>>>> of handling
>>>>>>> >>> this given this conversation, and am curious which solution
>>>>>>> sounds like the
>>>>>>> >>> best way to deal with the problem:
>>>>>>> >>>
>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>> (which would
>>>>>>> >>> be the same across transforms by convention), that is in the
>>>>>>> form of a
>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>> >>>
>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>> PCollection so we
>>>>>>> >>> can run applies afterward.
>>>>>>> >>>
>>>>>>> >>> 3. Make output transforms provide the facility for a callback
>>>>>>> function
>>>>>>> >>> which runs after the transform is complete.
>>>>>>> >>>
>>>>>>> >>> I went through these gymnastics recently when I was trying to
>>>>>>> build
>>>>>>> >>> something that would move indices after writing to Algolia, and
>>>>>>> the solution
>>>>>>> >>> was to co-opt code from the old Sink class that used to exist in
>>>>>>> Beam. The
>>>>>>> >>> problem is that particular method requires the output transform
>>>>>>> in question
>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t make
>>>>>>> sense to
>>>>>>> >>> return one. This seems like a bad solution, but unfortunately
>>>>>>> there isn’t a
>>>>>>> >>> notion of a transform that has no explicit output that needs to
>>>>>>> have
>>>>>>> >>> operations occur after it.
>>>>>>> >>>
>>>>>>> >>> The three potential solutions above address this issue, but I
>>>>>>> would like
>>>>>>> >>> to hear on which would be preferable (or perhaps a different
>>>>>>> proposal
>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on this,
>>>>>>> since it
>>>>>>> >>> seems like a worthwhile feature addition. I would find it really
>>>>>>> useful, for
>>>>>>> >>> one.
>>>>>>> >>>
>>>>>>> >>> Chet
>>>>>>> >>>
>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>> >>>
>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is
>>>>>>> returned
>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>> additional functions
>>>>>>> >>> can be applied.
>>>>>>> >>>
>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>> j...@nanthrax.net>
>>>>>>> >>> wrote:
>>>>>>> >>>>
>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than in
>>>>>>> the
>>>>>>> >>>> main.
>>>>>>> >>>>
>>>>>>> >>>> Regards
>>>>>>> >>>> JB
>>>>>>> >>>>
>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>> >>>>>
>>>>>>> >>>>> I do something almost exactly like this, but with BigtableIO
>>>>>>> instead.
>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I need
>>>>>>> to finish this
>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>> something like
>>>>>>> >>>>> this.
>>>>>>> >>>>>
>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the
>>>>>>> output from
>>>>>>> >>>>> the BigtableIO, and then feed that into your function which
>>>>>>> will run when
>>>>>>> >>>>> all writes finish.
>>>>>>> >>>>>
>>>>>>> >>>>> You probably want to avoid doing something in the main method
>>>>>>> because
>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver will
>>>>>>> die, get
>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>> >>>>>
>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>> >>>>>
>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com
>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in your
>>>>>>> Main
>>>>>>> >>>>> method.
>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>> >>>>>
>>>>>>> >>>>>     Example:
>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>> >>>>>     try {
>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>> >>>>>     //DO ES work
>>>>>>> >>>>>     }
>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>> >>>>>     result.cancel();
>>>>>>> >>>>>     throw e;
>>>>>>> >>>>>     }
>>>>>>> >>>>>
>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a work
>>>>>>> flow.
>>>>>>> >>>>>
>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>> >>>>> <j...@nanthrax.net
>>>>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>>         Hi,
>>>>>>> >>>>>
>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>> >>>>>
>>>>>>> >>>>>         We can imagine to have a user callback fn fired when
>>>>>>> the sink
>>>>>>> >>>>> batch is
>>>>>>> >>>>>         complete.
>>>>>>> >>>>>
>>>>>>> >>>>>         Let me think about that.
>>>>>>> >>>>>
>>>>>>> >>>>>         Regards
>>>>>>> >>>>>         JB
>>>>>>> >>>>>
>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>> >>>>>
>>>>>>> >>>>>             Hey JB,
>>>>>>> >>>>>
>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>> monitoring
>>>>>>> >>>>> when the ES
>>>>>>> >>>>>             transform completes successfully before I can
>>>>>>> proceed with
>>>>>>> >>>>> doing the
>>>>>>> >>>>>             swap.
>>>>>>> >>>>>             The problem with this is that I can't think of a
>>>>>>> good way
>>>>>>> >>>>> to
>>>>>>> >>>>>             determine that termination state short of polling
>>>>>>> the new
>>>>>>> >>>>> index to
>>>>>>> >>>>>             check the document count compared to the size of
>>>>>>> input
>>>>>>> >>>>> PCollection.
>>>>>>> >>>>>             That, or maybe I'd need to use an external system
>>>>>>> like you
>>>>>>> >>>>> mentioned
>>>>>>> >>>>>             to poll on the state of the pipeline (I'm using
>>>>>>> Google
>>>>>>> >>>>> Dataflow, so
>>>>>>> >>>>>             maybe there's a way to do this with some API).
>>>>>>> >>>>>             But I would have thought that there would be an
>>>>>>> easy way of
>>>>>>> >>>>> simply
>>>>>>> >>>>>             saying "do not process this transform until this
>>>>>>> other
>>>>>>> >>>>> transform
>>>>>>> >>>>>             completes".
>>>>>>> >>>>>             Is there no established way of "signaling" between
>>>>>>> >>>>> pipelines when
>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>> declaring a
>>>>>>> >>>>> dependency
>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>> >>>>>
>>>>>>> >>>>>             Thanks again,
>>>>>>> >>>>>             Philip
>>>>>>> >>>>>
>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste
>>>>>>> Onofré
>>>>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>>>> >>>>> <mailto:j...@nanthrax.net
>>>>>>> >>>>>
>>>>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>>                  Hi Philip,
>>>>>>> >>>>>
>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>> pipeline as
>>>>>>> >>>>> the
>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>> >>>>>
>>>>>>> >>>>>                  So, (3) has to be done in another pipeline
>>>>>>> (using a
>>>>>>> >>>>> DoFn) or in
>>>>>>> >>>>>             another
>>>>>>> >>>>>                  "system" (like Camel for instance). I would
>>>>>>> do a check
>>>>>>> >>>>> of the
>>>>>>> >>>>>             data in the
>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>> >>>>>
>>>>>>> >>>>>                  Regards
>>>>>>> >>>>>                  JB
>>>>>>> >>>>>
>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>>>>> >>>>>
>>>>>>> >>>>>                      Hi,
>>>>>>> >>>>>
>>>>>>> >>>>>                      I'm pretty new to Beam, and I've been
>>>>>>> trying to
>>>>>>> >>>>> use the
>>>>>>> >>>>>             ElasticSearchIO
>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>> >>>>>                      1. ingest and transform rows from DB
>>>>>>> (done)
>>>>>>> >>>>>                      2. write JSON docs/strings into a new ES
>>>>>>> index
>>>>>>> >>>>> (done)
>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>> documents are
>>>>>>> >>>>> written into
>>>>>>> >>>>>             a new index,
>>>>>>> >>>>>                      trigger an atomic index swap under an
>>>>>>> alias to
>>>>>>> >>>>> replace the
>>>>>>> >>>>>             current
>>>>>>> >>>>>                      aliased index with the new index
>>>>>>> generated in step
>>>>>>> >>>>> 2. This
>>>>>>> >>>>>             is basically
>>>>>>> >>>>>                      a single POST request to the ES cluster.
>>>>>>> >>>>>
>>>>>>> >>>>>                      The problem I'm facing is that I don't
>>>>>>> seem to be
>>>>>>> >>>>> able to
>>>>>>> >>>>>             find a way to
>>>>>>> >>>>>                      have a way for (3) to happen after step
>>>>>>> (2) is
>>>>>>> >>>>> complete.
>>>>>>> >>>>>
>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>> returns a
>>>>>>> >>>>> PDone, and
>>>>>>> >>>>>             I'm not sure
>>>>>>> >>>>>                      how to proceed from there because it
>>>>>>> doesn't seem
>>>>>>> >>>>> to let me
>>>>>>> >>>>>             do another
>>>>>>> >>>>>                      apply on it to "define" a dependency.
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >>
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >>>>>
>>>>>>> >>>>> <
>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>> >>>
>>>>>>> >>>>>
>>>>>>> >>>>>                      Is there a recommended way to construct
>>>>>>> pipelines
>>>>>>> >>>>> workflows
>>>>>>> >>>>>             like this?
>>>>>>> >>>>>
>>>>>>> >>>>>                      Thanks in advance,
>>>>>>> >>>>>                      Philip
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>>>>> jbono...@apache.org>>
>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>>
>>>>>>> >>>>
>>>>>>> >>>> --
>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>> >>>> jbono...@apache.org
>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>

Reply via email to