+1 At the very least an empty PCollection<?> could be produced with no promises about its contents but the ability to be followed (e.g. as a side input), which is forward compatible with whatever actual metadata one may decide to produce in the future.
On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> wrote: > +dev@ > > I am in complete agreement with Luke. Data dependencies are easy to > understand and a good way for an IO to communicate and establish causal > dependencies. Converting an IO from PDone to real output may spur further > useful thoughts based on the design decisions about what sort of output is > most useful. > > Kenn > > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> wrote: >> >> I think all sinks actually do have valuable information to output which >> can be used after a write (file names, transaction/commit/row ids, table >> names, ...). In addition to this metadata, having a PCollection of all >> successful writes and all failed writes is useful for users so they can >> chain an action which depends on what was or wasn't successfully written. >> Users have requested adding retry/failure handling policies to sinks so that >> failed writes don't jam up the pipeline. >> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <chet.aldr...@postmates.com> >> wrote: >>> >>> So I agree generally with the idea that returning a PCollection makes all >>> of this easier so that arbitrary additional functions can be added, what >>> exactly would write functions be returning in a PCollection that would make >>> sense? The whole idea is that we’ve written to an external source and now >>> the collection itself is no longer needed. >>> >>> Currently, that’s represented with a PDone, but currently that doesn’t >>> allow any work to occur after it. I see a couple possible ways of handling >>> this given this conversation, and am curious which solution sounds like the >>> best way to deal with the problem: >>> >>> 1. Have output transforms always return something specific (which would >>> be the same across transforms by convention), that is in the form of a >>> PCollection, so operations can occur after it. >>> >>> 2. Make either PDone or some new type that can act as a PCollection so we >>> can run applies afterward. >>> >>> 3. Make output transforms provide the facility for a callback function >>> which runs after the transform is complete. >>> >>> I went through these gymnastics recently when I was trying to build >>> something that would move indices after writing to Algolia, and the solution >>> was to co-opt code from the old Sink class that used to exist in Beam. The >>> problem is that particular method requires the output transform in question >>> to return a PCollection, even if it is trivial or doesn’t make sense to >>> return one. This seems like a bad solution, but unfortunately there isn’t a >>> notion of a transform that has no explicit output that needs to have >>> operations occur after it. >>> >>> The three potential solutions above address this issue, but I would like >>> to hear on which would be preferable (or perhaps a different proposal >>> altogether?). Perhaps we could also start up a ticket on this, since it >>> seems like a worthwhile feature addition. I would find it really useful, for >>> one. >>> >>> Chet >>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote: >>> >>> Instead of a callback fn, its most useful if a PCollection is returned >>> containing the result of the sink so that any arbitrary additional functions >>> can be applied. >>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >>> wrote: >>>> >>>> Agree, I would prefer to do the callback in the IO more than in the >>>> main. >>>> >>>> Regards >>>> JB >>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>> >>>>> I do something almost exactly like this, but with BigtableIO instead. >>>>> I have a pull request open here [1] (which reminds me I need to finish >>>>> this >>>>> up...). It would really be nice for most IOs to support something like >>>>> this. >>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output from >>>>> the BigtableIO, and then feed that into your function which will run when >>>>> all writes finish. >>>>> >>>>> You probably want to avoid doing something in the main method because >>>>> there's no guarantee it'll actually run (maybe the driver will die, get >>>>> killed, machine will explode, etc). >>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>> >>>>> Assuming you're in Java. You could just follow on in your Main >>>>> method. >>>>> Checking the state of the Result. >>>>> >>>>> Example: >>>>> PipelineResult result = pipeline.run(); >>>>> try { >>>>> result.waitUntilFinish(); >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>> //DO ES work >>>>> } >>>>> } catch(Exception e) { >>>>> result.cancel(); >>>>> throw e; >>>>> } >>>>> >>>>> Otherwise you could also use Oozie to construct a work flow. >>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>> <j...@nanthrax.net >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> yes, we had a similar question some days ago. >>>>> >>>>> We can imagine to have a user callback fn fired when the sink >>>>> batch is >>>>> complete. >>>>> >>>>> Let me think about that. >>>>> >>>>> Regards >>>>> JB >>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>> >>>>> Hey JB, >>>>> >>>>> Thanks for getting back so quickly. >>>>> I suppose in that case I would need a way of monitoring >>>>> when the ES >>>>> transform completes successfully before I can proceed with >>>>> doing the >>>>> swap. >>>>> The problem with this is that I can't think of a good way >>>>> to >>>>> determine that termination state short of polling the new >>>>> index to >>>>> check the document count compared to the size of input >>>>> PCollection. >>>>> That, or maybe I'd need to use an external system like you >>>>> mentioned >>>>> to poll on the state of the pipeline (I'm using Google >>>>> Dataflow, so >>>>> maybe there's a way to do this with some API). >>>>> But I would have thought that there would be an easy way of >>>>> simply >>>>> saying "do not process this transform until this other >>>>> transform >>>>> completes". >>>>> Is there no established way of "signaling" between >>>>> pipelines when >>>>> some pipeline completes, or have some way of declaring a >>>>> dependency >>>>> of 1 transform on another transform? >>>>> >>>>> Thanks again, >>>>> Philip >>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>> <mailto:j...@nanthrax.net >>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>> >>>>> Hi Philip, >>>>> >>>>> You won't be able to do (3) in the same pipeline as >>>>> the >>>>> Elasticsearch Sink >>>>> PTransform ends the pipeline with PDone. >>>>> >>>>> So, (3) has to be done in another pipeline (using a >>>>> DoFn) or in >>>>> another >>>>> "system" (like Camel for instance). I would do a check >>>>> of the >>>>> data in the >>>>> index and then trigger the swap there. >>>>> >>>>> Regards >>>>> JB >>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >>>>> >>>>> Hi, >>>>> >>>>> I'm pretty new to Beam, and I've been trying to >>>>> use the >>>>> ElasticSearchIO >>>>> sink to write docs into ES. >>>>> With this, I want to be able to >>>>> 1. ingest and transform rows from DB (done) >>>>> 2. write JSON docs/strings into a new ES index >>>>> (done) >>>>> 3. After (2) is complete and all documents are >>>>> written into >>>>> a new index, >>>>> trigger an atomic index swap under an alias to >>>>> replace the >>>>> current >>>>> aliased index with the new index generated in step >>>>> 2. This >>>>> is basically >>>>> a single POST request to the ES cluster. >>>>> >>>>> The problem I'm facing is that I don't seem to be >>>>> able to >>>>> find a way to >>>>> have a way for (3) to happen after step (2) is >>>>> complete. >>>>> >>>>> The ElasticSearchIO.Write transform returns a >>>>> PDone, and >>>>> I'm not sure >>>>> how to proceed from there because it doesn't seem >>>>> to let me >>>>> do another >>>>> apply on it to "define" a dependency. >>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>> >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html> >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>> >>>>> >>>>> Is there a recommended way to construct pipelines >>>>> workflows >>>>> like this? >>>>> >>>>> Thanks in advance, >>>>> Philip >>>>> >>>>> >>>>> -- Jean-Baptiste Onofré >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>> <mailto:jbono...@apache.org <mailto:jbono...@apache.org>> >>>>> http://blog.nanthrax.net >>>>> Talend - http://www.talend.com >>>>> >>>>> >>>>> >>>>> -- Jean-Baptiste Onofré >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>> http://blog.nanthrax.net >>>>> Talend - http://www.talend.com >>>>> >>>>> >>>>> >>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>> ---------------------------------------------------- >>>>> NerdyNick.com <http://NerdyNick.com> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> jbono...@apache.org >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >>> >>> >>> >> >