Thanks, Kenn and Robert! It got me thinking and digging.

In terms of BEAM-4775 <https://issues.apache.org/jira/browse/BEAM-4775> I'm
currently working on, it seems the only thing I can do now is preparing a
version without collecting metrics when after cancel(). I think I'll go
this direction first (looks like a low hanging fruit) and then work on next
steps.

I think the question here is whether PipelineRunner::run is allowed to be
blocking.

IMO, it shouldn't be blocking. There's also an existing issue (at least for
Flink) to make FlinkRunner::run() and cancel() unblocking - BEAM-593
<https://jira.apache.org/jira/browse/BEAM-593>. Solving this issue will
allow us collecting PipelineResult early (before the job finishes), and
call cancel() or any other method (e.g. getMetrics()) on it in any random
moment while the job is running without depending on its current state.
Other than that I think we need a way to get the state of FlinkRunner (any
other runner?) while it's still running (via callback from runner?). From
what I understand state transitioning in JobInvocation is not done as it
should be (see this TODO
<https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L85>).
Once we get state from the runner we could set it in JobInvocation
properly.

śr., 7 sie 2019 o 15:18 Robert Bradshaw <rober...@google.com> napisał(a):

> I think the question here is whether PipelineRunner::run is allowed to
> be blocking. If it is, then the futures make sense (but there's no way
> to properly cancel it). I'm OK with not being able to return metrics
> on cancel in this case, or the case the pipeline didn't even start up
> yet. Otherwise, we should quickly get a handle to the PipelineResult
> and be able to query that for all future use.
>
> On Fri, Jul 26, 2019 at 6:04 PM Kenneth Knowles <k...@apache.org> wrote:
> >
> > Took a look at the code, too. It seems like a mismatch in a few ways
> >
> >  - PipelineRunner::run is async already and returns while the job is
> still running
> >  - PipelineResult is a legacy name - it is really meant to be a handle
> to a running job
> >  - cancel() on a future is just not really related to cancel() in a job.
> I would expect to cancel a job with PipelineResult::cancel and I would
> expect JobInvocation::cancel to cancel the "start job"
> RPC/request/whatever. So I would not expect metrics for a job which I
> decided to not even start.
> >
> > Kenn
> >
> > On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy <lgaj...@apache.org>
> wrote:
> >>
> >> Hi all,
> >>
> >> I'm currently working on BEAM-4775. The goal here is to pass portable
> MetricResults over the RPC API to the PortableRunner (SDK) part and allow
> reading them there. The metrics can be collected from the pipeline result
> that is available in JobInvocation's callbacks. The callbacks are
> registered in start() and cancel() methods of JobInvocation. This is the
> place where my problems begin:
> >>
> >> I want to access the pipeline result and get the MetricResults from it.
> This is possible only in onSuccess(PipelineResult result) method of the
> callbacks registered in start() and cancel() in JobInvocation. Now, when I
> cancel the job invocation, invocationFuture.cancel() is called and will
> result in invoking onFailure(Throwable throwable) in case the pipeline is
> still running. onFailure() has no PipelineResult parameter, hence there
> currently is no possibility to collect the metrics there.
> >>
> >> My questions currently are:
> >>
> >> Should we collect metrics after the job is canceled? So far I assumed
> that we should.
> >> If so, does anyone have some other ideas on how to collect metrics so
> that we could collect them when canceling the job?
> >>
> >> PR I'm working on with more discussions on the topic: PR 9020
> >> The current idea on how the metrics could be collected in
> JobInvocation: link
> >>
> >> Thanks,
> >> Łukasz
> >>
>

Reply via email to