Hi Pei,

I have someone on my time who started to work on this, I'll follow-up,
thanks for the bum ;-)

Amit

On Thu, Oct 13, 2016 at 8:38 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Pei,
>
> good one !
>
> We now have to update the 'other' runners.
>
> Thanks.
>
> Regards
> JB
>
> On 10/12/2016 10:48 PM, Pei He wrote:
> > Hi,
> > I just want to bump this thread, and brought it to attention.
> >
> > PipelineResult now have cancel() and waitUntilFinish(). However,
> currently
> > only DataflowRunner supports it in DataflowPipelineJob.
> >
> > We agreed that users should do "p.run().waitUntilFinish()" if they want
> to
> > block. But, if they do it now, direct, flink, spark runners will throw
> > exceptions.
> >
> > I have following jira issues opened, I am wondering could any people help
> > on them?
> >
> > https://issues.apache.org/jira/browse/BEAM-596
> > https://issues.apache.org/jira/browse/BEAM-595
> > https://issues.apache.org/jira/browse/BEAM-593
> >
> > Thanks
> > --
> > Pei
> >
> >
> >
> >
> > On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela <amitsel...@gmail.com>
> wrote:
> >
> >> +1 and Thanks!
> >>
> >> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw
> >> <rober...@google.com.invalid>
> >> wrote:
> >>
> >>> +1, sounds great. Thanks Pei.
> >>>
> >>> On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <lc...@google.com.invalid
> >
> >>> wrote:
> >>>> +1 for your proposal Pei
> >>>>
> >>>> On Mon, Jul 25, 2016 at 5:54 PM, Pei He <pe...@google.com.invalid>
> >>> wrote:
> >>>>
> >>>>> Looks to me that followings are agreed:
> >>>>> (1). adding cancel() and waitUntilFinish() to PipelineResult.
> >>>>> (In streaming mode, "all data watermarks reach to infinity" is
> >>>>> considered as finished.)
> >>>>> (2). PipelineRunner.run() should return relatively quick as soon as
> >>>>> the pipeline/job is started/running. The blocking logic should be
> left
> >>>>> to users' code to handle with PipelineResult.waitUntilFinish(). (Test
> >>>>> runners that finish quickly can block run() until the execution is
> >>>>> done. So, it is cleaner to verify test results after run())
> >>>>>
> >>>>> I will send out PR for (1), and create jira issues to improve runners
> >>> for
> >>>>> (2).
> >>>>>
> >>>>> waitToRunning() is controversial, and we have several half way agreed
> >>>>> proposals.
> >>>>> I will pull them out from this thread, so we can close this proposal
> >>>>> with cancel() and waitUntilFinish(). And, i will create a jira issue
> >>>>> to track how to support ''waiting until other states".
> >>>>>
> >>>>> Does that sound good with anyone?
> >>>>>
> >>>>> Thanks
> >>>>> --
> >>>>> Pei
> >>>>>
> >>>>> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
> >>>>> <rober...@google.com.invalid> wrote:
> >>>>>> On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <bchamb...@apache.org
> >>>
> >>>>> wrote:
> >>>>>>> This health check seems redundant with just waiting a while and
> >> then
> >>>>>>> checking on the status, other than returning earlier in the case of
> >>>>>>> reaching a terminal state. What about adding:
> >>>>>>>
> >>>>>>> /**
> >>>>>>>  * Returns the state after waiting the specified duration. Will
> >>> return
> >>>>>>> earlier if the pipeline
> >>>>>>>  * reaches a terminal state.
> >>>>>>>  */
> >>>>>>> State getStateAfter(Duration duration);
> >>>>>>>
> >>>>>>> This seems to be a useful building block, both for the user's
> >>> pipeline
> >>>>> (in
> >>>>>>> case they wanted to build something like wait and then check
> >> health)
> >>> and
> >>>>>>> also for the SDK (to implement waitUntilFinished, etc.)
> >>>>>>
> >>>>>> A generic waitFor(Duration) which may return early if a terminal
> >> state
> >>>>>> is entered seems useful. I don't know that we need a return value
> >>>>>> here, given that we an then query the PipelineResult however we want
> >>>>>> once this returns. waitUntilFinished is simply
> >>>>>> waitFor(InfiniteDuration).
> >>>>>>
> >>>>>>> On Thu, Jul 21, 2016 at 4:11 PM Pei He <pe...@google.com.invalid>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> I am not in favor of supporting wait for every states or
> >>>>>>>> waitUntilState(...).
> >>>>>>>> One reason is PipelineResult.State is not well defined and is not
> >>>>>>>> agreed upon runners.
> >>>>>>>> Another reason is users might not want to wait for a particular
> >>> state.
> >>>>>>>> For example,
> >>>>>>>> waitUntilFinish() is to wait for a terminal state.
> >>>>>>>> So, even runners have different states, we still can define shared
> >>>>>>>> properties, such as finished/terminal.
> >>>>>>
> >>>>>> +1. Running is an intermediate state that doesn't have an obvious
> >>>>>> mapping onto all runners, which is another reason it's odd to wait
> >>>>>> until then. All runners have terminal states.
> >>>>>>
> >>>>>>>> I think when users call waitUntilRunning(), they want to make sure
> >>> the
> >>>>>>>> pipeline is up running and is healthy.
> >>>>>>>> Maybe we want to wait for at
> >>>>>>>> least one element went through the pipeline.
> >>>>>>
> >>>>>> -1, That might be a while... Also, you may not start generating data
> >>>>>> until you pipline is up.
> >>>>>>
> >>>>>>>> What about changing the waitUntilRunning() to the following?
> >>>>>>>>
> >>>>>>>> /**
> >>>>>>>> * Check if the pipeline is health for the duration.
> >>>>>>>> *
> >>>>>>>> * Return true if the pipeline is healthy at the end of duration.
> >>>>>>>> * Return false if the pipeline is not healthy at the end of
> >>> duration.
> >>>>>>>> * <p>It may return early if the pipeline is in an unrecoverable
> >>> failure
> >>>>>>>> state.
> >>>>>>>> */
> >>>>>>>> boolean PipelineResult.healthCheck(Duration duration)
> >>>>>>>>
> >>>>>>>> (I think this also addressed Robert's comment about
> >> waitToRunning())
> >>>>>>>>
> >>>>>>>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles
> >>>>> <k...@google.com.invalid>
> >>>>>>>> wrote:
> >>>>>>>>> Some more comments:
> >>>>>>>>>
> >>>>>>>>>  - What are the allowed/expected state transitions prior to
> >>> RUNNING?
> >>>>>>>> Today,
> >>>>>>>>> I presume it is any nonterminal state, so it can be UNKNOWN or
> >>>>> STOPPED
> >>>>>>>>> (which really means "not yet started") prior to RUNNING. Is this
> >>>>> what we
> >>>>>>>>> want?
> >>>>>>>>>
> >>>>>>>>>  - If a job can be paused, a transition from RUNNING to STOPPED,
> >>> then
> >>>>>>>>> waitUntilPaused(Duration) makes sense.
> >>>>>>>>>
> >>>>>>>>>  - Assuming there is some polling under the hood, are runners
> >>>>> required to
> >>>>>>>>> send back a full history of transitions? Or can transitions be
> >>>>> missed,
> >>>>>>>> with
> >>>>>>>>> only the latest state retrieved?
> >>>>>>>>>
> >>>>>>>>>  - If the latter, then does waitUntilRunning() only wait until
> >>>>> RUNNING or
> >>>>>>>>> does it also return when it sees STOPPED, which could certainly
> >>>>> indicate
> >>>>>>>>> that the job transitioned to RUNNING then STOPPED in between
> >>> polls.
> >>>>> In
> >>>>>>>> that
> >>>>>>>>> case it is, today, the same as waitUntilStateIsKnown().
> >>>>>>>>>
> >>>>>>>>>  - The obvious limit of this discussion is
> >>> waitUntilState(Duration,
> >>>>>>>>> Set<State>), which is the same amount of work to implement. Am I
> >>>>> correct
> >>>>>>>>> that everyone in this thread thinks this generality is just not
> >>> the
> >>>>> right
> >>>>>>>>> thing for a user API?
> >>>>>>>>>
> >>>>>>>>>  - This enum could probably use revision. I'd chose some
> >>> combination
> >>>>> of
> >>>>>>>>> tightening the enum, making it extensible, and make some aspect
> >>> of it
> >>>>>>>>> free-form. Not sure where the best balance lies.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
> >>>>>>>> <bchamb...@google.com.invalid
> >>>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning
> >>> rather
> >>>>> than
> >>>>>>>>>> waitToRunning which reads oddly)
> >>>>>>>>>>
> >>>>>>>>>> The only reason to separate submission from waitUntilRunning
> >>> would
> >>>>> be if
> >>>>>>>>>> you wanted to kick off several pipelines in quick succession,
> >>> then
> >>>>> wait
> >>>>>>>> for
> >>>>>>>>>> them all to be running. For instance:
> >>>>>>>>>>
> >>>>>>>>>> PipelineResult p1Future = p1.run();
> >>>>>>>>>> PipelineResult p2Future = p2.run();
> >>>>>>>>>> ...
> >>>>>>>>>>
> >>>>>>>>>> p1Future.waitUntilRunning();
> >>>>>>>>>> p2Future.waitUntilRunning();
> >>>>>>>>>> ...
> >>>>>>>>>>
> >>>>>>>>>> In this setup, you can more quickly start several pipelines,
> >> but
> >>>>> your
> >>>>>>>> main
> >>>>>>>>>> program would wait and report any errors before exiting.
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
> >>>>>>>>>> <rober...@google.com.invalid> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> I'm in favor of the proposal. My only question is whether we
> >>> need
> >>>>>>>>>>> PipelineResult.waitToRunning(), instead I'd propose that
> >> run()
> >>>>> block
> >>>>>>>>>>> until the pipeline's running/successfully submitted (or
> >>> failed).
> >>>>> This
> >>>>>>>>>>> would simplify the API--we'd only have one kind of wait that
> >>> makes
> >>>>>>>>>>> sense in all cases.
> >>>>>>>>>>>
> >>>>>>>>>>> What kinds of interactions would one want to have with the
> >>>>>>>>>>> PipelineResults before it's running?
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh
> >>>>>>>> <tg...@google.com.invalid>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>> TestPipeline is probably the one runner that can be
> >> expected
> >>> to
> >>>>>>>> block,
> >>>>>>>>>> as
> >>>>>>>>>>>> certainly JUnit tests and likely other tests will run the
> >>>>> Pipeline,
> >>>>>>>> and
> >>>>>>>>>>>> succeed, even if the PipelineRunner throws an exception.
> >>>>> Luckily,
> >>>>>>>> this
> >>>>>>>>>>> can
> >>>>>>>>>>>> be added to TestPipeline.run(), which already has
> >> additional
> >>>>>>>> behavior
> >>>>>>>>>>>> associated with it (currently regarding the unwrapping of
> >>>>>>>>>>> AssertionErrors)
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles
> >>>>>>>>>> <k...@google.com.invalid
> >>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I like this proposal. It makes pipeline.run() seem like a
> >>>>> pretty
> >>>>>>>>>> normal
> >>>>>>>>>>>>> async request, and easy to program with. It removes the
> >>>>> implicit
> >>>>>>>>>>> assumption
> >>>>>>>>>>>>> in the prior design that main() is pretty much just "build
> >>> and
> >>>>> run
> >>>>>>>> a
> >>>>>>>>>>>>> pipeline".
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The part of this that I care about most is being able to
> >>> write
> >>>>> a
> >>>>>>>>>> program
> >>>>>>>>>>>>> (not the pipeline, but the program that launches one or
> >> more
> >>>>>>>>>> pipelines)
> >>>>>>>>>>>>> that has reasonable cross-runner behavior.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> One comment:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Jul 20, 2016 at 3:39 PM, Pei He
> >>>>> <pe...@google.com.invalid>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4. PipelineRunner.run() should (but not required) do
> >>>>> non-blocking
> >>>>>>>>>> runs
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think we can elaborate on this a little bit. Obviously
> >>> there
> >>>>>>>> might
> >>>>>>>>>> be
> >>>>>>>>>>>>> "blocking" in terms of, say, an HTTP round-trip to submit
> >>> the
> >>>>> job,
> >>>>>>>> but
> >>>>>>>>>>>>> run() should never be non-terminating.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For a test runner that finishes the pipeline quickly, I
> >>> would
> >>>>> be
> >>>>>>>> fine
> >>>>>>>>>>> with
> >>>>>>>>>>>>> run() just executing the pipeline, but the PipelineResult
> >>>>> should
> >>>>>>>> still
> >>>>>>>>>>>>> emulate the usual - just always returning a terminal
> >>> status. It
> >>>>>>>> would
> >>>>>>>>>> be
> >>>>>>>>>>>>> annoying to add waitToFinish() to the end of all our
> >> tests,
> >>> but
> >>>>>>>>>> leaving
> >>>>>>>>>>> a
> >>>>>>>>>>>>> run() makes the tests only work with special blocking
> >> runner
> >>>>>>>> wrappers
> >>>>>>>>>>> (and
> >>>>>>>>>>>>> make them poor examples). A JUnit @Rule for test pipeline
> >>> would
> >>>>>>>> hide
> >>>>>>>>>> all
> >>>>>>>>>>>>> that, perhaps.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>
> >>
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to