(Somehow my email has failed to be sent multiple times, so I am using my
personal email account)

Hi,

Piotrek - Thanks for the feedback! I revised the doc as commented.

Here's the second part about exception classification -
https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing
I put cross-links between the first and the second.

Thanks,
Hwanju

2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <pi...@ververica.com>님이 작성:

> Hi Hwanju,
>
> I looked through the document, however I’m not the best person to
> review/judge/discuss about implementation details here. I hope that Chesney
> will be able to help in this regard.
>
> Piotrek
>
> > On 24 May 2019, at 09:09, Kim, Hwanju <hwanj...@amazon.com.INVALID>
> wrote:
> >
> > Hi,
> >
> > As suggested by Piotrek, the first part, execution state tracking, is
> now split to a separate doc:
> >
> https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing
> >
> > We'd appreciate any feedback. I am still using the same email thread to
> provide a full context, but please let me know if it's better to have a
> separate email thread as well. We will be sharing the remaining parts once
> ready.
> >
> > Thanks,
> > Hwanju
> >
> > On 5/17/19, 12:59 AM, "Piotr Nowojski" <pi...@ververica.com> wrote:
> >
> >    Hi Hwanju & Chesney,
> >
> >    Regarding various things that both of you mentioned, like accounting
> of state restoration separately or batch scheduling, we can always
> acknowledge some limitations of the initial approach and maybe we can
> address them later if we evaluate it worth the effort.
> >
> >    Generally speaking all that you have written make sense to me, so +1
> from my side to split the discussion into separate threads.
> >
> >    Piotrek
> >
> >> On 17 May 2019, at 08:57, Kim, Hwanju <hwanj...@amazon.com.INVALID>
> wrote:
> >>
> >> Hi Piotrek,
> >>
> >> Thanks for insightful feedback and indeed you got most tricky parts and
> concerns.
> >>
> >>> 1. Do we currently account state restore as “RUNNING”? If yes, this
> might be incorrect from your perspective.
> >>
> >> As Chesnay said, initializeState is called in StreamTask.invoke after
> transitioning to RUNNING. So, task state restore part is currently during
> RUNNING. I think accounting state restore as running seems fine, since
> state size is user's artifact, as long as we can detect service error
> during restore (indeed, DFS issue usually happens at
> createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens
> at initializeState in StreamTask.invoke). We can discuss about the need to
> have separate state to track restore and running separately, but it seems
> to add too many messages in common paths just for tracking.
> >>
> >>> 2a. This might be more tricky if various Tasks are in various stages.
> For example in streaming, it should be safe to assume that state of the
> job, is “minimum” of it’s Tasks’ states, so Job should be accounted as
> RUNNING only if all of the Tasks are either RUNNING or COMPLETED.
> >>
> >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING.
> For others, when the first task transitions to SCHEDULED, SCHEDULING stage
> begins, and when the first task transitions to DEPLOYING, it starts
> DEPLOYING stage. This would be fine especially for eager scheduling and
> full-restart fail-over strategy. In the individual or partial restart, we
> may not need to specifically track SCHEDULING and DEPLOYING states while
> treating job as running relying on progress monitor.
> >>
> >>> 2b. However in batch - including DataStream jobs running against
> bounded data streams, like Blink SQL - this might be more tricky, since
> there are ongoing efforts to schedule part of the job graphs in stages. For
> example do not schedule probe side of the join until build side is
> done/completed.
> >>
> >> Exactly. I have roughly looked at batch side, but not in detail yet and
> am aware of ongoing scheduling work. Initial focus of breaking out to
> multiple states like scheduling/deploying would be only for streaming with
> eager scheduling. Need to give more thought how to deal with batching/lazy
> scheduling.
> >>
> >>> What exactly would you like to report here? List of exception with
> downtime caused by it, for example: exception X caused a job to be down for
> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state
> restore?
> >>
> >> Basically, initial cause is traced back from each component of
> downtime, which is accounted to a certain type like user or system based on
> the classification. So you're right. Interesting part here is about
> secondary failure. For example, a user error causes a job to restart but
> then scheduling is failed by system issue. We need to account failing,
> restarting time to user, while scheduling time on restart (e.g,. 5min
> timeout) is to system. A further example is that a system error causes a
> job to be failing, but one of the user function is not reacting to
> cancellation (for full-restart), prolonged failing time (e.g., watchdog
> timeout 3min) shouldn’t be accounted to system (of course, the other way
> around has been seen -- e.g., FLINK-5463).
> >>
> >>> Why do you think about implementing classifiers? Couldn’t we classify
> exceptions by exception type, like `FlinkUserException`,
> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that
> we throw correct exception types + handle/wrap exceptions correctly when
> crossing Flink system/user code border? This way we could know exactly
> whether exception occurred in the user code or in Flink code.
> >>
> >> I think classifier here is complementary to exception type approach. In
> this context, classifier is "f(exception) -> type". Type is used as metric
> dimension to set alert on certain types or have downtime breakdown on each
> type (type is not just fixed to "user" or "system" but can be more specific
> and customizable like statebackend and network). If we do wrap exceptions
> perfectly as you said, f() is simple enough to look at Exception type and
> then return its corresponding type.
> >>
> >> Initially we also thought complete wrapping would be ideal. However,
> even inside UDF, it can call in Flink framework like state update or call
> out dependent services, which service provider may want to classify
> separately. In addition, Flink allows user to use lower level API like
> streamoperator to make the border a little blurring. Those would make
> complete wrapping challenging. Besides, stack-based classification beyond
> exception type could still be needed for stuck progress classification.
> >>
> >> Without instrumentation, one of base classifiers that work for our
> environment in many cases is user-class-loader classifier, which can detect
> if an exception is thrown from the class loaded from user JAR/artifact
> (although this may be less desirable in an environment where user's
> artifacts can be installed directly in system lib/, but service providers
> would be opting in self-contained jar submission keeping system environment
> for system-only).
> >>
> >>>  One thing that might be tricky is if error in Flink code is caused by
> user’s mistake.
> >>
> >> Right, this is the trickiest part. Based on our analysis with real
> data, the most ambiguous ones are custom serialization and out-of-resource
> errors.  The former is usually seen in Flink runtime code rather than in
> UDF. The latter is that Flink stack is just a victim by resource hog/leak
> of user code (OOM, too many open files). For the serialization issue, we've
> been looking at (and learning) various serialization errors seen in the
> field to get reasonable classification. For the out-of-resource, rather
> than user vs. system classification, we can tag the type as "resource"
> relying on dump (e.g., heap dump) and postmortem analysis as-needed basis.
> >>
> >>> Hmmm, this might be tricky. We can quite easily detect which exact
> Task is causing back pressure in at least couple of different ways. Tricky
> part would be to determine whether this is caused by user or not, but
> probably some simple stack trace probing on back pressured task once every
> N seconds should solve this - similar how sampling profilers work.
> >>
> >> Again you're right and like you said, this part would be mostly reusing
> the existing building blocks such as latency marker and backpressure
> samplings. If configured only with progress monitoring not latency
> distribution tracking, latency marker can be lightweight skipping histogram
> update part just updating latest timestamp with longer period not to
> adversely affect performance. Once stuck progress is detected, stack
> sampling can tell us more about the context that causes backpressure.
> >>
> >>> Luckily it seems like those four issues/proposals could be
> implemented/discussed independently or in stages.
> >> Agreed. Once some level of initial discussion clears things out at
> least high level, I can start out more independent threads.
> >>
> >> Best,
> >> Hwanju
> >>
> >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <pi...@ververica.com> wrote:
> >>
> >>   Hi Hwanju,
> >>
> >>   Thanks for starting the discussion. Definitely any improvement in
> this area would be very helpful and valuable. Generally speaking +1 from my
> side, as long as we make sure that either such changes do not add
> performance overhead (which I think they shouldn’t) or they are optional.
> >>
> >>> Firstly, we need to account time for each stage of task execution such
> as scheduling, deploying, and running, to enable better visibility of how
> long a job takes in which stage while not running user functions.
> >>
> >>   Couple of questions/remarks:
> >>   1. Do we currently account state restore as “RUNNING”? If yes, this
> might be incorrect from your perspective.
> >>   2a. This might be more tricky if various Tasks are in various stages.
> For example in streaming, it should be safe to assume that state of the
> job, is “minimum” of it’s Tasks’ states, so Job should be accounted as
> RUNNING only if all of the Tasks are either RUNNING or COMPLETED.
> >>   2b. However in batch - including DataStream jobs running against
> bounded data streams, like Blink SQL - this might be more tricky, since
> there are ongoing efforts to schedule part of the job graphs in stages. For
> example do not schedule probe side of the join until build side is
> done/completed.
> >>
> >>> Secondly, any downtime in each stage can be associated with a failure
> cause, which could be identified by Java exception notified to job manager
> on task failure or unhealthy task manager (Flink already maintains a cause
> but it can be associated with an execution stage for causal tracking)
> >>
> >>   What exactly would you like to report here? List of exception with
> downtime caused by it, for example: exception X caused a job to be down for
> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state
> restore?
> >>
> >>> Thirdly, downtime reason should be classified into user- or
> system-induced failure. This needs exception classifier by drawing the line
> between user-defined functions (or public API) and Flink runtime — This is
> particularly challenging to have 100% accuracy at one-shot due to empirical
> nature and custom logic injection like serialization, so pluggable
> classifier filters are must-have to enable incremental improvement.
> >>
> >>   Why do you think about implementing classifiers? Couldn’t we classify
> exceptions by exception type, like `FlinkUserException`,
> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that
> we throw correct exception types + handle/wrap exceptions correctly when
> crossing Flink system/user code border? This way we could know exactly
> whether exception occurred in the user code or in Flink code.
> >>
> >>   One thing that might be tricky is if error in Flink code is caused by
> user’s mistake.
> >>
> >>
> >>> Fourthly, stuck progress
> >>
> >>   Hmmm, this might be tricky. We can quite easily detect which exact
> Task is causing back pressure in at least couple of different ways. Tricky
> part would be to determine whether this is caused by user or not, but
> probably some simple stack trace probing on back pressured task once every
> N seconds should solve this - similar how sampling profilers work.
> >>
> >>   Luckily it seems like those four issues/proposals could be
> implemented/discussed independently or in stages.
> >>
> >>   Piotrek
> >>
> >>> On 11 May 2019, at 06:50, Kim, Hwanju <hwanj...@amazon.com.INVALID>
> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a
> discussion thread about a project we consider for Flink operational
> improvement in production. We would like to start conversation early before
> detailed design, so any high-level feedback would welcome.
> >>>
> >>> For service providers who operate Flink in a multi-tenant environment,
> such as AWS Kinesis Data Analytics, it is crucial to measure application
> health and clearly differentiate application unavailability issue caused by
> Flink framework or service environment from the ones caused by application
> code. The current metrics of Flink represent overall job availability in
> time, it still needs to be improved to give Flink operators better insight
> for the detailed application availability. The current availability metrics
> such as uptime and downtime measures the time based on the running state of
> a job, which does not necessarily represent actual running state of a job
> (after a job transitions to running, each task should still be
> scheduled/deployed in order to run user-defined functions). The detailed
> view should enable operators to have visibility on 1) how long each
> specific stage takes (e.g., task scheduling or deployment), 2) what failure
> is introduced in which stage leading to job downtime, 3) whether such
> failure is classified to user code error (e.g., uncaught exception from
> user-defined function) or platform/environmental errors (e.g.,
> checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug).
> The last one is particularly needed to allow Flink operators to define SLA
> where only a small fraction of downtime should be introduced by service
> fault. All of these visibility enhancements can help community detect and
> fix Flink runtime issues quickly, whereby Flink can become more robust
> operating system for hosting data analytics applications.
> >>>
> >>> The current proposal is as follows. Firstly, we need to account time
> for each stage of task execution such as scheduling, deploying, and
> running, to enable better visibility of how long a job takes in which stage
> while not running user functions. Secondly, any downtime in each stage can
> be associated with a failure cause, which could be identified by Java
> exception notified to job manager on task failure or unhealthy task manager
> (Flink already maintains a cause but it can be associated with an execution
> stage for causal tracking). Thirdly, downtime reason should be classified
> into user- or system-induced failure. This needs exception classifier by
> drawing the line between user-defined functions (or public API) and Flink
> runtime — This is particularly challenging to have 100% accuracy at
> one-shot due to empirical nature and custom logic injection like
> serialization, so pluggable classifier filters are must-have to enable
> incremental improvement. Fourthly, stuck progress, where task is apparently
> running but not being able to process data generally manifesting itself as
> long backpressure, can be monitored as higher level job availability and
> the runtime can determine whether the reason to be stuck is caused by user
> (e.g., under-provisioned resource, user function bug) or system (deadlock
> or livelock in Flink runtime). Finally, all the detailed tracking
> information and metrics are exposed via REST and Flink metrics, so that
> Flink dashboard can have enhanced information about job
> execution/availability and operators can set alarm appropriately on metrics.
> >>>
> >>> Best,
> >>> Hwanju
> >>>
> >>
> >>
> >>
> >
> >
> >
>
>

Reply via email to