Hi Pei, I have someone on my time who started to work on this, I'll follow-up, thanks for the bum ;-)
Amit On Thu, Oct 13, 2016 at 8:38 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Pei, > > good one ! > > We now have to update the 'other' runners. > > Thanks. > > Regards > JB > > On 10/12/2016 10:48 PM, Pei He wrote: > > Hi, > > I just want to bump this thread, and brought it to attention. > > > > PipelineResult now have cancel() and waitUntilFinish(). However, > currently > > only DataflowRunner supports it in DataflowPipelineJob. > > > > We agreed that users should do "p.run().waitUntilFinish()" if they want > to > > block. But, if they do it now, direct, flink, spark runners will throw > > exceptions. > > > > I have following jira issues opened, I am wondering could any people help > > on them? > > > > https://issues.apache.org/jira/browse/BEAM-596 > > https://issues.apache.org/jira/browse/BEAM-595 > > https://issues.apache.org/jira/browse/BEAM-593 > > > > Thanks > > -- > > Pei > > > > > > > > > > On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela <amitsel...@gmail.com> > wrote: > > > >> +1 and Thanks! > >> > >> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw > >> <rober...@google.com.invalid> > >> wrote: > >> > >>> +1, sounds great. Thanks Pei. > >>> > >>> On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <lc...@google.com.invalid > > > >>> wrote: > >>>> +1 for your proposal Pei > >>>> > >>>> On Mon, Jul 25, 2016 at 5:54 PM, Pei He <pe...@google.com.invalid> > >>> wrote: > >>>> > >>>>> Looks to me that followings are agreed: > >>>>> (1). adding cancel() and waitUntilFinish() to PipelineResult. > >>>>> (In streaming mode, "all data watermarks reach to infinity" is > >>>>> considered as finished.) > >>>>> (2). PipelineRunner.run() should return relatively quick as soon as > >>>>> the pipeline/job is started/running. The blocking logic should be > left > >>>>> to users' code to handle with PipelineResult.waitUntilFinish(). (Test > >>>>> runners that finish quickly can block run() until the execution is > >>>>> done. So, it is cleaner to verify test results after run()) > >>>>> > >>>>> I will send out PR for (1), and create jira issues to improve runners > >>> for > >>>>> (2). > >>>>> > >>>>> waitToRunning() is controversial, and we have several half way agreed > >>>>> proposals. > >>>>> I will pull them out from this thread, so we can close this proposal > >>>>> with cancel() and waitUntilFinish(). And, i will create a jira issue > >>>>> to track how to support ''waiting until other states". > >>>>> > >>>>> Does that sound good with anyone? > >>>>> > >>>>> Thanks > >>>>> -- > >>>>> Pei > >>>>> > >>>>> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw > >>>>> <rober...@google.com.invalid> wrote: > >>>>>> On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <bchamb...@apache.org > >>> > >>>>> wrote: > >>>>>>> This health check seems redundant with just waiting a while and > >> then > >>>>>>> checking on the status, other than returning earlier in the case of > >>>>>>> reaching a terminal state. What about adding: > >>>>>>> > >>>>>>> /** > >>>>>>> * Returns the state after waiting the specified duration. Will > >>> return > >>>>>>> earlier if the pipeline > >>>>>>> * reaches a terminal state. > >>>>>>> */ > >>>>>>> State getStateAfter(Duration duration); > >>>>>>> > >>>>>>> This seems to be a useful building block, both for the user's > >>> pipeline > >>>>> (in > >>>>>>> case they wanted to build something like wait and then check > >> health) > >>> and > >>>>>>> also for the SDK (to implement waitUntilFinished, etc.) > >>>>>> > >>>>>> A generic waitFor(Duration) which may return early if a terminal > >> state > >>>>>> is entered seems useful. I don't know that we need a return value > >>>>>> here, given that we an then query the PipelineResult however we want > >>>>>> once this returns. waitUntilFinished is simply > >>>>>> waitFor(InfiniteDuration). > >>>>>> > >>>>>>> On Thu, Jul 21, 2016 at 4:11 PM Pei He <pe...@google.com.invalid> > >>>>> wrote: > >>>>>>> > >>>>>>>> I am not in favor of supporting wait for every states or > >>>>>>>> waitUntilState(...). > >>>>>>>> One reason is PipelineResult.State is not well defined and is not > >>>>>>>> agreed upon runners. > >>>>>>>> Another reason is users might not want to wait for a particular > >>> state. > >>>>>>>> For example, > >>>>>>>> waitUntilFinish() is to wait for a terminal state. > >>>>>>>> So, even runners have different states, we still can define shared > >>>>>>>> properties, such as finished/terminal. > >>>>>> > >>>>>> +1. Running is an intermediate state that doesn't have an obvious > >>>>>> mapping onto all runners, which is another reason it's odd to wait > >>>>>> until then. All runners have terminal states. > >>>>>> > >>>>>>>> I think when users call waitUntilRunning(), they want to make sure > >>> the > >>>>>>>> pipeline is up running and is healthy. > >>>>>>>> Maybe we want to wait for at > >>>>>>>> least one element went through the pipeline. > >>>>>> > >>>>>> -1, That might be a while... Also, you may not start generating data > >>>>>> until you pipline is up. > >>>>>> > >>>>>>>> What about changing the waitUntilRunning() to the following? > >>>>>>>> > >>>>>>>> /** > >>>>>>>> * Check if the pipeline is health for the duration. > >>>>>>>> * > >>>>>>>> * Return true if the pipeline is healthy at the end of duration. > >>>>>>>> * Return false if the pipeline is not healthy at the end of > >>> duration. > >>>>>>>> * <p>It may return early if the pipeline is in an unrecoverable > >>> failure > >>>>>>>> state. > >>>>>>>> */ > >>>>>>>> boolean PipelineResult.healthCheck(Duration duration) > >>>>>>>> > >>>>>>>> (I think this also addressed Robert's comment about > >> waitToRunning()) > >>>>>>>> > >>>>>>>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles > >>>>> <k...@google.com.invalid> > >>>>>>>> wrote: > >>>>>>>>> Some more comments: > >>>>>>>>> > >>>>>>>>> - What are the allowed/expected state transitions prior to > >>> RUNNING? > >>>>>>>> Today, > >>>>>>>>> I presume it is any nonterminal state, so it can be UNKNOWN or > >>>>> STOPPED > >>>>>>>>> (which really means "not yet started") prior to RUNNING. Is this > >>>>> what we > >>>>>>>>> want? > >>>>>>>>> > >>>>>>>>> - If a job can be paused, a transition from RUNNING to STOPPED, > >>> then > >>>>>>>>> waitUntilPaused(Duration) makes sense. > >>>>>>>>> > >>>>>>>>> - Assuming there is some polling under the hood, are runners > >>>>> required to > >>>>>>>>> send back a full history of transitions? Or can transitions be > >>>>> missed, > >>>>>>>> with > >>>>>>>>> only the latest state retrieved? > >>>>>>>>> > >>>>>>>>> - If the latter, then does waitUntilRunning() only wait until > >>>>> RUNNING or > >>>>>>>>> does it also return when it sees STOPPED, which could certainly > >>>>> indicate > >>>>>>>>> that the job transitioned to RUNNING then STOPPED in between > >>> polls. > >>>>> In > >>>>>>>> that > >>>>>>>>> case it is, today, the same as waitUntilStateIsKnown(). > >>>>>>>>> > >>>>>>>>> - The obvious limit of this discussion is > >>> waitUntilState(Duration, > >>>>>>>>> Set<State>), which is the same amount of work to implement. Am I > >>>>> correct > >>>>>>>>> that everyone in this thread thinks this generality is just not > >>> the > >>>>> right > >>>>>>>>> thing for a user API? > >>>>>>>>> > >>>>>>>>> - This enum could probably use revision. I'd chose some > >>> combination > >>>>> of > >>>>>>>>> tightening the enum, making it extensible, and make some aspect > >>> of it > >>>>>>>>> free-form. Not sure where the best balance lies. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers > >>>>>>>> <bchamb...@google.com.invalid > >>>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning > >>> rather > >>>>> than > >>>>>>>>>> waitToRunning which reads oddly) > >>>>>>>>>> > >>>>>>>>>> The only reason to separate submission from waitUntilRunning > >>> would > >>>>> be if > >>>>>>>>>> you wanted to kick off several pipelines in quick succession, > >>> then > >>>>> wait > >>>>>>>> for > >>>>>>>>>> them all to be running. For instance: > >>>>>>>>>> > >>>>>>>>>> PipelineResult p1Future = p1.run(); > >>>>>>>>>> PipelineResult p2Future = p2.run(); > >>>>>>>>>> ... > >>>>>>>>>> > >>>>>>>>>> p1Future.waitUntilRunning(); > >>>>>>>>>> p2Future.waitUntilRunning(); > >>>>>>>>>> ... > >>>>>>>>>> > >>>>>>>>>> In this setup, you can more quickly start several pipelines, > >> but > >>>>> your > >>>>>>>> main > >>>>>>>>>> program would wait and report any errors before exiting. > >>>>>>>>>> > >>>>>>>>>> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw > >>>>>>>>>> <rober...@google.com.invalid> wrote: > >>>>>>>>>> > >>>>>>>>>>> I'm in favor of the proposal. My only question is whether we > >>> need > >>>>>>>>>>> PipelineResult.waitToRunning(), instead I'd propose that > >> run() > >>>>> block > >>>>>>>>>>> until the pipeline's running/successfully submitted (or > >>> failed). > >>>>> This > >>>>>>>>>>> would simplify the API--we'd only have one kind of wait that > >>> makes > >>>>>>>>>>> sense in all cases. > >>>>>>>>>>> > >>>>>>>>>>> What kinds of interactions would one want to have with the > >>>>>>>>>>> PipelineResults before it's running? > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh > >>>>>>>> <tg...@google.com.invalid> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> TestPipeline is probably the one runner that can be > >> expected > >>> to > >>>>>>>> block, > >>>>>>>>>> as > >>>>>>>>>>>> certainly JUnit tests and likely other tests will run the > >>>>> Pipeline, > >>>>>>>> and > >>>>>>>>>>>> succeed, even if the PipelineRunner throws an exception. > >>>>> Luckily, > >>>>>>>> this > >>>>>>>>>>> can > >>>>>>>>>>>> be added to TestPipeline.run(), which already has > >> additional > >>>>>>>> behavior > >>>>>>>>>>>> associated with it (currently regarding the unwrapping of > >>>>>>>>>>> AssertionErrors) > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles > >>>>>>>>>> <k...@google.com.invalid > >>>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> I like this proposal. It makes pipeline.run() seem like a > >>>>> pretty > >>>>>>>>>> normal > >>>>>>>>>>>>> async request, and easy to program with. It removes the > >>>>> implicit > >>>>>>>>>>> assumption > >>>>>>>>>>>>> in the prior design that main() is pretty much just "build > >>> and > >>>>> run > >>>>>>>> a > >>>>>>>>>>>>> pipeline". > >>>>>>>>>>>>> > >>>>>>>>>>>>> The part of this that I care about most is being able to > >>> write > >>>>> a > >>>>>>>>>> program > >>>>>>>>>>>>> (not the pipeline, but the program that launches one or > >> more > >>>>>>>>>> pipelines) > >>>>>>>>>>>>> that has reasonable cross-runner behavior. > >>>>>>>>>>>>> > >>>>>>>>>>>>> One comment: > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Jul 20, 2016 at 3:39 PM, Pei He > >>>>> <pe...@google.com.invalid> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. PipelineRunner.run() should (but not required) do > >>>>> non-blocking > >>>>>>>>>> runs > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think we can elaborate on this a little bit. Obviously > >>> there > >>>>>>>> might > >>>>>>>>>> be > >>>>>>>>>>>>> "blocking" in terms of, say, an HTTP round-trip to submit > >>> the > >>>>> job, > >>>>>>>> but > >>>>>>>>>>>>> run() should never be non-terminating. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For a test runner that finishes the pipeline quickly, I > >>> would > >>>>> be > >>>>>>>> fine > >>>>>>>>>>> with > >>>>>>>>>>>>> run() just executing the pipeline, but the PipelineResult > >>>>> should > >>>>>>>> still > >>>>>>>>>>>>> emulate the usual - just always returning a terminal > >>> status. It > >>>>>>>> would > >>>>>>>>>> be > >>>>>>>>>>>>> annoying to add waitToFinish() to the end of all our > >> tests, > >>> but > >>>>>>>>>> leaving > >>>>>>>>>>> a > >>>>>>>>>>>>> run() makes the tests only work with special blocking > >> runner > >>>>>>>> wrappers > >>>>>>>>>>> (and > >>>>>>>>>>>>> make them poor examples). A JUnit @Rule for test pipeline > >>> would > >>>>>>>> hide > >>>>>>>>>> all > >>>>>>>>>>>>> that, perhaps. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Kenn > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>> > >>> > >> > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >