(Somehow my email has failed to be sent multiple times, so I am using my personal email account)
Hi, Piotrek - Thanks for the feedback! I revised the doc as commented. Here's the second part about exception classification - https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing I put cross-links between the first and the second. Thanks, Hwanju 2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <pi...@ververica.com>님이 작성: > Hi Hwanju, > > I looked through the document, however I’m not the best person to > review/judge/discuss about implementation details here. I hope that Chesney > will be able to help in this regard. > > Piotrek > > > On 24 May 2019, at 09:09, Kim, Hwanju <hwanj...@amazon.com.INVALID> > wrote: > > > > Hi, > > > > As suggested by Piotrek, the first part, execution state tracking, is > now split to a separate doc: > > > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > > > We'd appreciate any feedback. I am still using the same email thread to > provide a full context, but please let me know if it's better to have a > separate email thread as well. We will be sharing the remaining parts once > ready. > > > > Thanks, > > Hwanju > > > > On 5/17/19, 12:59 AM, "Piotr Nowojski" <pi...@ververica.com> wrote: > > > > Hi Hwanju & Chesney, > > > > Regarding various things that both of you mentioned, like accounting > of state restoration separately or batch scheduling, we can always > acknowledge some limitations of the initial approach and maybe we can > address them later if we evaluate it worth the effort. > > > > Generally speaking all that you have written make sense to me, so +1 > from my side to split the discussion into separate threads. > > > > Piotrek > > > >> On 17 May 2019, at 08:57, Kim, Hwanju <hwanj...@amazon.com.INVALID> > wrote: > >> > >> Hi Piotrek, > >> > >> Thanks for insightful feedback and indeed you got most tricky parts and > concerns. > >> > >>> 1. Do we currently account state restore as “RUNNING”? If yes, this > might be incorrect from your perspective. > >> > >> As Chesnay said, initializeState is called in StreamTask.invoke after > transitioning to RUNNING. So, task state restore part is currently during > RUNNING. I think accounting state restore as running seems fine, since > state size is user's artifact, as long as we can detect service error > during restore (indeed, DFS issue usually happens at > createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens > at initializeState in StreamTask.invoke). We can discuss about the need to > have separate state to track restore and running separately, but it seems > to add too many messages in common paths just for tracking. > >> > >>> 2a. This might be more tricky if various Tasks are in various stages. > For example in streaming, it should be safe to assume that state of the > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > >> > >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. > For others, when the first task transitions to SCHEDULED, SCHEDULING stage > begins, and when the first task transitions to DEPLOYING, it starts > DEPLOYING stage. This would be fine especially for eager scheduling and > full-restart fail-over strategy. In the individual or partial restart, we > may not need to specifically track SCHEDULING and DEPLOYING states while > treating job as running relying on progress monitor. > >> > >>> 2b. However in batch - including DataStream jobs running against > bounded data streams, like Blink SQL - this might be more tricky, since > there are ongoing efforts to schedule part of the job graphs in stages. For > example do not schedule probe side of the join until build side is > done/completed. > >> > >> Exactly. I have roughly looked at batch side, but not in detail yet and > am aware of ongoing scheduling work. Initial focus of breaking out to > multiple states like scheduling/deploying would be only for streaming with > eager scheduling. Need to give more thought how to deal with batching/lazy > scheduling. > >> > >>> What exactly would you like to report here? List of exception with > downtime caused by it, for example: exception X caused a job to be down for > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > restore? > >> > >> Basically, initial cause is traced back from each component of > downtime, which is accounted to a certain type like user or system based on > the classification. So you're right. Interesting part here is about > secondary failure. For example, a user error causes a job to restart but > then scheduling is failed by system issue. We need to account failing, > restarting time to user, while scheduling time on restart (e.g,. 5min > timeout) is to system. A further example is that a system error causes a > job to be failing, but one of the user function is not reacting to > cancellation (for full-restart), prolonged failing time (e.g., watchdog > timeout 3min) shouldn’t be accounted to system (of course, the other way > around has been seen -- e.g., FLINK-5463). > >> > >>> Why do you think about implementing classifiers? Couldn’t we classify > exceptions by exception type, like `FlinkUserException`, > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that > we throw correct exception types + handle/wrap exceptions correctly when > crossing Flink system/user code border? This way we could know exactly > whether exception occurred in the user code or in Flink code. > >> > >> I think classifier here is complementary to exception type approach. In > this context, classifier is "f(exception) -> type". Type is used as metric > dimension to set alert on certain types or have downtime breakdown on each > type (type is not just fixed to "user" or "system" but can be more specific > and customizable like statebackend and network). If we do wrap exceptions > perfectly as you said, f() is simple enough to look at Exception type and > then return its corresponding type. > >> > >> Initially we also thought complete wrapping would be ideal. However, > even inside UDF, it can call in Flink framework like state update or call > out dependent services, which service provider may want to classify > separately. In addition, Flink allows user to use lower level API like > streamoperator to make the border a little blurring. Those would make > complete wrapping challenging. Besides, stack-based classification beyond > exception type could still be needed for stuck progress classification. > >> > >> Without instrumentation, one of base classifiers that work for our > environment in many cases is user-class-loader classifier, which can detect > if an exception is thrown from the class loaded from user JAR/artifact > (although this may be less desirable in an environment where user's > artifacts can be installed directly in system lib/, but service providers > would be opting in self-contained jar submission keeping system environment > for system-only). > >> > >>> One thing that might be tricky is if error in Flink code is caused by > user’s mistake. > >> > >> Right, this is the trickiest part. Based on our analysis with real > data, the most ambiguous ones are custom serialization and out-of-resource > errors. The former is usually seen in Flink runtime code rather than in > UDF. The latter is that Flink stack is just a victim by resource hog/leak > of user code (OOM, too many open files). For the serialization issue, we've > been looking at (and learning) various serialization errors seen in the > field to get reasonable classification. For the out-of-resource, rather > than user vs. system classification, we can tag the type as "resource" > relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. > >> > >>> Hmmm, this might be tricky. We can quite easily detect which exact > Task is causing back pressure in at least couple of different ways. Tricky > part would be to determine whether this is caused by user or not, but > probably some simple stack trace probing on back pressured task once every > N seconds should solve this - similar how sampling profilers work. > >> > >> Again you're right and like you said, this part would be mostly reusing > the existing building blocks such as latency marker and backpressure > samplings. If configured only with progress monitoring not latency > distribution tracking, latency marker can be lightweight skipping histogram > update part just updating latest timestamp with longer period not to > adversely affect performance. Once stuck progress is detected, stack > sampling can tell us more about the context that causes backpressure. > >> > >>> Luckily it seems like those four issues/proposals could be > implemented/discussed independently or in stages. > >> Agreed. Once some level of initial discussion clears things out at > least high level, I can start out more independent threads. > >> > >> Best, > >> Hwanju > >> > >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <pi...@ververica.com> wrote: > >> > >> Hi Hwanju, > >> > >> Thanks for starting the discussion. Definitely any improvement in > this area would be very helpful and valuable. Generally speaking +1 from my > side, as long as we make sure that either such changes do not add > performance overhead (which I think they shouldn’t) or they are optional. > >> > >>> Firstly, we need to account time for each stage of task execution such > as scheduling, deploying, and running, to enable better visibility of how > long a job takes in which stage while not running user functions. > >> > >> Couple of questions/remarks: > >> 1. Do we currently account state restore as “RUNNING”? If yes, this > might be incorrect from your perspective. > >> 2a. This might be more tricky if various Tasks are in various stages. > For example in streaming, it should be safe to assume that state of the > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > >> 2b. However in batch - including DataStream jobs running against > bounded data streams, like Blink SQL - this might be more tricky, since > there are ongoing efforts to schedule part of the job graphs in stages. For > example do not schedule probe side of the join until build side is > done/completed. > >> > >>> Secondly, any downtime in each stage can be associated with a failure > cause, which could be identified by Java exception notified to job manager > on task failure or unhealthy task manager (Flink already maintains a cause > but it can be associated with an execution stage for causal tracking) > >> > >> What exactly would you like to report here? List of exception with > downtime caused by it, for example: exception X caused a job to be down for > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > restore? > >> > >>> Thirdly, downtime reason should be classified into user- or > system-induced failure. This needs exception classifier by drawing the line > between user-defined functions (or public API) and Flink runtime — This is > particularly challenging to have 100% accuracy at one-shot due to empirical > nature and custom logic injection like serialization, so pluggable > classifier filters are must-have to enable incremental improvement. > >> > >> Why do you think about implementing classifiers? Couldn’t we classify > exceptions by exception type, like `FlinkUserException`, > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that > we throw correct exception types + handle/wrap exceptions correctly when > crossing Flink system/user code border? This way we could know exactly > whether exception occurred in the user code or in Flink code. > >> > >> One thing that might be tricky is if error in Flink code is caused by > user’s mistake. > >> > >> > >>> Fourthly, stuck progress > >> > >> Hmmm, this might be tricky. We can quite easily detect which exact > Task is causing back pressure in at least couple of different ways. Tricky > part would be to determine whether this is caused by user or not, but > probably some simple stack trace probing on back pressured task once every > N seconds should solve this - similar how sampling profilers work. > >> > >> Luckily it seems like those four issues/proposals could be > implemented/discussed independently or in stages. > >> > >> Piotrek > >> > >>> On 11 May 2019, at 06:50, Kim, Hwanju <hwanj...@amazon.com.INVALID> > wrote: > >>> > >>> Hi, > >>> > >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a > discussion thread about a project we consider for Flink operational > improvement in production. We would like to start conversation early before > detailed design, so any high-level feedback would welcome. > >>> > >>> For service providers who operate Flink in a multi-tenant environment, > such as AWS Kinesis Data Analytics, it is crucial to measure application > health and clearly differentiate application unavailability issue caused by > Flink framework or service environment from the ones caused by application > code. The current metrics of Flink represent overall job availability in > time, it still needs to be improved to give Flink operators better insight > for the detailed application availability. The current availability metrics > such as uptime and downtime measures the time based on the running state of > a job, which does not necessarily represent actual running state of a job > (after a job transitions to running, each task should still be > scheduled/deployed in order to run user-defined functions). The detailed > view should enable operators to have visibility on 1) how long each > specific stage takes (e.g., task scheduling or deployment), 2) what failure > is introduced in which stage leading to job downtime, 3) whether such > failure is classified to user code error (e.g., uncaught exception from > user-defined function) or platform/environmental errors (e.g., > checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). > The last one is particularly needed to allow Flink operators to define SLA > where only a small fraction of downtime should be introduced by service > fault. All of these visibility enhancements can help community detect and > fix Flink runtime issues quickly, whereby Flink can become more robust > operating system for hosting data analytics applications. > >>> > >>> The current proposal is as follows. Firstly, we need to account time > for each stage of task execution such as scheduling, deploying, and > running, to enable better visibility of how long a job takes in which stage > while not running user functions. Secondly, any downtime in each stage can > be associated with a failure cause, which could be identified by Java > exception notified to job manager on task failure or unhealthy task manager > (Flink already maintains a cause but it can be associated with an execution > stage for causal tracking). Thirdly, downtime reason should be classified > into user- or system-induced failure. This needs exception classifier by > drawing the line between user-defined functions (or public API) and Flink > runtime — This is particularly challenging to have 100% accuracy at > one-shot due to empirical nature and custom logic injection like > serialization, so pluggable classifier filters are must-have to enable > incremental improvement. Fourthly, stuck progress, where task is apparently > running but not being able to process data generally manifesting itself as > long backpressure, can be monitored as higher level job availability and > the runtime can determine whether the reason to be stuck is caused by user > (e.g., under-provisioned resource, user function bug) or system (deadlock > or livelock in Flink runtime). Finally, all the detailed tracking > information and metrics are exposed via REST and Flink metrics, so that > Flink dashboard can have enhanced information about job > execution/availability and operators can set alarm appropriately on metrics. > >>> > >>> Best, > >>> Hwanju > >>> > >> > >> > >> > > > > > > > >