+1

At the very least an empty PCollection<?> could be produced with no
promises about its contents but the ability to be followed (e.g. as a
side input), which is forward compatible with whatever actual metadata
one may decide to produce in the future.

On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> wrote:
> +dev@
>
> I am in complete agreement with Luke. Data dependencies are easy to
> understand and a good way for an IO to communicate and establish causal
> dependencies. Converting an IO from PDone to real output may spur further
> useful thoughts based on the design decisions about what sort of output is
> most useful.
>
> Kenn
>
> On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>> I think all sinks actually do have valuable information to output which
>> can be used after a write (file names, transaction/commit/row ids, table
>> names, ...). In addition to this metadata, having a PCollection of all
>> successful writes and all failed writes is useful for users so they can
>> chain an action which depends on what was or wasn't successfully written.
>> Users have requested adding retry/failure handling policies to sinks so that
>> failed writes don't jam up the pipeline.
>>
>> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <chet.aldr...@postmates.com>
>> wrote:
>>>
>>> So I agree generally with the idea that returning a PCollection makes all
>>> of this easier so that arbitrary additional functions can be added, what
>>> exactly would write functions be returning in a PCollection that would make
>>> sense? The whole idea is that we’ve written to an external source and now
>>> the collection itself is no longer needed.
>>>
>>> Currently, that’s represented with a PDone, but currently that doesn’t
>>> allow any work to occur after it. I see a couple possible ways of handling
>>> this given this conversation, and am curious which solution sounds like the
>>> best way to deal with the problem:
>>>
>>> 1. Have output transforms always return something specific (which would
>>> be the same across transforms by convention), that is in the form of a
>>> PCollection, so operations can occur after it.
>>>
>>> 2. Make either PDone or some new type that can act as a PCollection so we
>>> can run applies afterward.
>>>
>>> 3. Make output transforms provide the facility for a callback function
>>> which runs after the transform is complete.
>>>
>>> I went through these gymnastics recently when I was trying to build
>>> something that would move indices after writing to Algolia, and the solution
>>> was to co-opt code from the old Sink class that used to exist in Beam. The
>>> problem is that particular method requires the output transform in question
>>> to return a PCollection, even if it is trivial or doesn’t make sense to
>>> return one. This seems like a bad solution, but unfortunately there isn’t a
>>> notion of a transform that has no explicit output that needs to have
>>> operations occur after it.
>>>
>>> The three potential solutions above address this issue, but I would like
>>> to hear on which would be preferable (or perhaps a different proposal
>>> altogether?). Perhaps we could also start up a ticket on this, since it
>>> seems like a worthwhile feature addition. I would find it really useful, for
>>> one.
>>>
>>> Chet
>>>
>>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>> Instead of a callback fn, its most useful if a PCollection is returned
>>> containing the result of the sink so that any arbitrary additional functions
>>> can be applied.
>>>
>>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>> wrote:
>>>>
>>>> Agree, I would prefer to do the callback in the IO more than in the
>>>> main.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>
>>>>> I do something almost exactly like this, but with BigtableIO instead.
>>>>> I have a pull request open here [1] (which reminds me I need to finish 
>>>>> this
>>>>> up...).  It would really be nice for most IOs to support something like
>>>>> this.
>>>>>
>>>>> Essentially you do a GroupByKey (or some CombineFn) on the output from
>>>>> the BigtableIO, and then feed that into your function which will run when
>>>>> all writes finish.
>>>>>
>>>>> You probably want to avoid doing something in the main method because
>>>>> there's no guarantee it'll actually run (maybe the driver will die, get
>>>>> killed, machine will explode, etc).
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>
>>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com
>>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>
>>>>>     Assuming you're in Java. You could just follow on in your Main
>>>>> method.
>>>>>     Checking the state of the Result.
>>>>>
>>>>>     Example:
>>>>>     PipelineResult result = pipeline.run();
>>>>>     try {
>>>>>     result.waitUntilFinish();
>>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>     //DO ES work
>>>>>     }
>>>>>     } catch(Exception e) {
>>>>>     result.cancel();
>>>>>     throw e;
>>>>>     }
>>>>>
>>>>>     Otherwise you could also use Oozie to construct a work flow.
>>>>>
>>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>> <j...@nanthrax.net
>>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>
>>>>>         Hi,
>>>>>
>>>>>         yes, we had a similar question some days ago.
>>>>>
>>>>>         We can imagine to have a user callback fn fired when the sink
>>>>> batch is
>>>>>         complete.
>>>>>
>>>>>         Let me think about that.
>>>>>
>>>>>         Regards
>>>>>         JB
>>>>>
>>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>
>>>>>             Hey JB,
>>>>>
>>>>>             Thanks for getting back so quickly.
>>>>>             I suppose in that case I would need a way of monitoring
>>>>> when the ES
>>>>>             transform completes successfully before I can proceed with
>>>>> doing the
>>>>>             swap.
>>>>>             The problem with this is that I can't think of a good way
>>>>> to
>>>>>             determine that termination state short of polling the new
>>>>> index to
>>>>>             check the document count compared to the size of input
>>>>> PCollection.
>>>>>             That, or maybe I'd need to use an external system like you
>>>>> mentioned
>>>>>             to poll on the state of the pipeline (I'm using Google
>>>>> Dataflow, so
>>>>>             maybe there's a way to do this with some API).
>>>>>             But I would have thought that there would be an easy way of
>>>>> simply
>>>>>             saying "do not process this transform until this other
>>>>> transform
>>>>>             completes".
>>>>>             Is there no established way of "signaling" between
>>>>> pipelines when
>>>>>             some pipeline completes, or have some way of declaring a
>>>>> dependency
>>>>>             of 1 transform on another transform?
>>>>>
>>>>>             Thanks again,
>>>>>             Philip
>>>>>
>>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
>>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>> <mailto:j...@nanthrax.net
>>>>>
>>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>
>>>>>                  Hi Philip,
>>>>>
>>>>>                  You won't be able to do (3) in the same pipeline as
>>>>> the
>>>>>             Elasticsearch Sink
>>>>>                  PTransform ends the pipeline with PDone.
>>>>>
>>>>>                  So, (3) has to be done in another pipeline (using a
>>>>> DoFn) or in
>>>>>             another
>>>>>                  "system" (like Camel for instance). I would do a check
>>>>> of the
>>>>>             data in the
>>>>>                  index and then trigger the swap there.
>>>>>
>>>>>                  Regards
>>>>>                  JB
>>>>>
>>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>>>
>>>>>                      Hi,
>>>>>
>>>>>                      I'm pretty new to Beam, and I've been trying to
>>>>> use the
>>>>>             ElasticSearchIO
>>>>>                      sink to write docs into ES.
>>>>>                      With this, I want to be able to
>>>>>                      1. ingest and transform rows from DB (done)
>>>>>                      2. write JSON docs/strings into a new ES index
>>>>> (done)
>>>>>                      3. After (2) is complete and all documents are
>>>>> written into
>>>>>             a new index,
>>>>>                      trigger an atomic index swap under an alias to
>>>>> replace the
>>>>>             current
>>>>>                      aliased index with the new index generated in step
>>>>> 2. This
>>>>>             is basically
>>>>>                      a single POST request to the ES cluster.
>>>>>
>>>>>                      The problem I'm facing is that I don't seem to be
>>>>> able to
>>>>>             find a way to
>>>>>                      have a way for (3) to happen after step (2) is
>>>>> complete.
>>>>>
>>>>>                      The ElasticSearchIO.Write transform returns a
>>>>> PDone, and
>>>>>             I'm not sure
>>>>>                      how to proceed from there because it doesn't seem
>>>>> to let me
>>>>>             do another
>>>>>                      apply on it to "define" a dependency.
>>>>>
>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>
>>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>
>>>>>
>>>>>                      Is there a recommended way to construct pipelines
>>>>> workflows
>>>>>             like this?
>>>>>
>>>>>                      Thanks in advance,
>>>>>                      Philip
>>>>>
>>>>>
>>>>>                  --     Jean-Baptiste Onofré
>>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>             <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>
>>>>>             http://blog.nanthrax.net
>>>>>                  Talend - http://www.talend.com
>>>>>
>>>>>
>>>>>
>>>>>         --         Jean-Baptiste Onofré
>>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>         http://blog.nanthrax.net
>>>>>         Talend - http://www.talend.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>     --     Nick Verbeck - NerdyNick
>>>>>     ----------------------------------------------------
>>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>
>>>
>>>
>>
>

Reply via email to