[ https://issues.apache.org/jira/browse/SPARK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14230807#comment-14230807 ]
Josh Rosen commented on SPARK-4498: ----------------------------------- [~andrewor14] and I just had a long discussion about this. To recap things, I think that there are two distinct bugs: 1. Driver disconnection is not properly detected by the Master. 2. The application failure detection logic SPARK-2425 is broken. The second bug can mask the first one. If something is wrong with the logic for detecting a disconnected application / driver, then the application still might eventually be removed if the "fail an application when its executors fail" logic worked correctly, since every executor assigned to the exited driver will fail. To simulate bugs in the driver disconnected logic, we can simply comment out the relevant line in the Master's {{DisassociatedEvent}} handler. After doing this, I was able to reproduce behavior similar to the issue reported in this ticket. With a local standalone cluster with one master, one worker, and two executor slots, I was able to start {{spark-shell}}, kill it with {{kill -9}}, then browse to the Master web UI and see that executors were continually launching and failing even though the driver had exited (this continued for 30+ minutes; it's still happening). I think that the logic for SPARK-2425 is completely broken in the sense that there are some cluster configurations for which it will _never_ kill an application even though every executor that it launches fails, and that there are very few scenarios where it will ever fail an application. To help explain this, I've created a commit that factors out the failure detection logic into its own class: https://github.com/JoshRosen/spark/commit/87d7960d660b218a9a965fd7d344e2aae0250128. That commit also includes unit tests of the failure detection logic in isolation, which helps to illustrate the current bugs. If you look at the current (broken) failure detection logic, two conditions must be met for an application to be marked as failed: 1. No executor can be running: {{!execs.exists(_.state == ExecutorState.RUNNING)}} 2. More than {{MAX_NUM_RETRY}} consecutive executor failure events must have been received: {{!appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY}} With the current logic, though, these conditions can almost never be met. The current (hardcoded) value of {{MAX_NUM_RETRY}} is 10. Imagine that I have a cluster that only has room for 2 concurrent executors and imagine that all executors fail immediately after launching. In order for the retry count to be incremented past the threshold, we must have at least 10 executor failures. In order for this to happen, though, executor launches must occur in between some of those failures, since we can only have 2 executors running at the same time. When an executor fails and a new one is relaunched, the new executor enters the RUNNING state and sends a message back to the Master, which causes the master to reset the retry count back to 0. Therefore, it's impossible for the failure detector to report failure in a cluster with fewer than 10 executor slots, even if every executor crashes. Even for larger clusters, it's still nearly impossible for the current logic to declare failure. Because of the "no executor can be running" condition, all executors must be dead in order for it to declare failure. The Master immediately calls {{schedule()}} after an executor failure, though, so in most cases a new executor will launch before we can see 10 consecutive failures (the test suite in my commit includes a contrived execution where it does declare failure, though). So, what do we do, considering that we need to fix this before 1.2.0 and as a bugfix to be included in 1.1.2? I don't think that we should just revert SPARK-2425, since it's not okay to introduce one regression to fix another. A correct application failure detector needs to strike this balance between protecting long-running applications from failures and quickly detecting buggy applications that will never work. Long running applications imply that we can't have counters / state that monotonically progresses towards failure, since we can assume that over an infinite execution there will be an infinite number of executor failures. Intuitively, I think we want something that's window-based: if a large fraction of "recently" launched executors have failed, then declare that the application has failed. Unfortunately, the master does not receive any signals about non-faulty executors (until they exit cleanly). One solution would be to add a driver -> master RPC that acknowledges that launched executors are in a good state (e.g. after a task has been sent to that executor). This would allow us to properly implement a "fail the application if the last _n_ launched executors failed" condition. However, this is still prone to false-positives if a single large, buggy host crashes every executor launched on it. Therefore, we might want to also incorporate some notion of worker-blacklisting in to the master. This would still have to be application-specific, since executors could exit uncleanly due to application bugs, so we don't want one buggy application to impact other applications' blacklists. This proposal is getting fairly complex, though, so I'd like to see if we can come up with a narrower fix to replace the current logic. Separately, I think that we should implement an explicit Driver -> Master heartbeat as an extra layer of defense against driver disconnection detection errors, as well as adding lots of additional debug logging in those code paths. > Standalone Master can fail to recognize completed/failed applications > --------------------------------------------------------------------- > > Key: SPARK-4498 > URL: https://issues.apache.org/jira/browse/SPARK-4498 > Project: Spark > Issue Type: Bug > Components: Deploy, Spark Core > Affects Versions: 1.1.1, 1.2.0 > Environment: - Linux dn11.chi.shopify.com 3.2.0-57-generic > #87-Ubuntu SMP 3 x86_64 x86_64 x86_64 GNU/Linux > - Standalone Spark built from > apache/spark#c6e0c2ab1c29c184a9302d23ad75e4ccd8060242 > - Python 2.7.3 > java version "1.7.0_71" > Java(TM) SE Runtime Environment (build 1.7.0_71-b14) > Java HotSpot(TM) 64-Bit Server VM (build 24.71-b01, mixed mode) > - 1 Spark master, 40 Spark workers with 32 cores a piece and 60-90 GB of > memory a piece > - All client code is PySpark > Reporter: Harry Brundage > Priority: Blocker > Attachments: all-master-logs-around-blip.txt, > one-applications-master-logs.txt > > > We observe the spark standalone master not detecting that a driver > application has completed after the driver process has shut down > indefinitely, leaving that driver's resources consumed indefinitely. The > master reports applications as Running, but the driver process has long since > terminated. The master continually spawns one executor for the application. > It boots, times out trying to connect to the driver application, and then > dies with the exception below. The master then spawns another executor on a > different worker, which does the same thing. The application lives until the > master (and workers) are restarted. > This happens to many jobs at once, all right around the same time, two or > three times a day, where they all get suck. Before and after this "blip" > applications start, get resources, finish, and are marked as finished > properly. The "blip" is mostly conjecture on my part, I have no hard evidence > that it exists other than my identification of the pattern in the Running > Applications table. See > http://cl.ly/image/2L383s0e2b3t/Screen%20Shot%202014-11-19%20at%203.43.09%20PM.png > : the applications started before the blip at 1.9 hours ago still have > active drivers. All the applications started 1.9 hours ago do not, and the > applications started less than 1.9 hours ago (at the top of the table) do in > fact have active drivers. > Deploy mode: > - PySpark drivers running on one node outside the cluster, scheduled by a > cron-like application, not master supervised > > Other factoids: > - In most places, we call sc.stop() explicitly before shutting down our > driver process > - Here's the sum total of spark configuration options we don't set to the > default: > {code} > "spark.cores.max": 30 > "spark.eventLog.dir": "hdfs://nn.shopify.com:8020/var/spark/event-logs" > "spark.eventLog.enabled": true > "spark.executor.memory": "7g" > "spark.hadoop.fs.defaultFS": "hdfs://nn.shopify.com:8020/" > "spark.io.compression.codec": "lzf" > "spark.ui.killEnabled": true > {code} > - The exception the executors die with is this: > {code} > 14/11/19 19:42:37 INFO CoarseGrainedExecutorBackend: Registered signal > handlers for [TERM, HUP, INT] > 14/11/19 19:42:37 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 14/11/19 19:42:37 INFO SecurityManager: Changing view acls to: spark,azkaban > 14/11/19 19:42:37 INFO SecurityManager: Changing modify acls to: spark,azkaban > 14/11/19 19:42:37 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(spark, azkaban); > users with modify permissions: Set(spark, azkaban) > 14/11/19 19:42:37 INFO Slf4jLogger: Slf4jLogger started > 14/11/19 19:42:37 INFO Remoting: Starting remoting > 14/11/19 19:42:38 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://driverpropsfetc...@dn13.chi.shopify.com:37682] > 14/11/19 19:42:38 INFO Utils: Successfully started service > 'driverPropsFetcher' on port 37682. > 14/11/19 19:42:38 WARN Remoting: Tried to associate with unreachable remote > address [akka.tcp://sparkdri...@spark-etl1.chi.shopify.com:58849]. Address is > now gated for 5000 ms, all messages to this address will be delivered to dead > letters. Reason: Connection refused: > spark-etl1.chi.shopify.com/172.16.126.88:58849 > 14/11/19 19:43:08 ERROR UserGroupInformation: PriviledgedActionException > as:azkaban (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures > timed out after [30 seconds] > Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: > Unknown exception in doAs > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) > Caused by: java.security.PrivilegedActionException: > java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > ... 4 more > Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 > seconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) > ... 7 more > {code} > Cluster history: > - We run spark versions built from apache/spark#master snapshots. We did not > observe this behaviour on {{7eb9cbc273d758522e787fcb2ef68ef65911475f}} (sorry > its so old), but now observe it on > {{c6e0c2ab1c29c184a9302d23ad75e4ccd8060242}}. We can try new versions to > assist debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org