[ 
https://issues.apache.org/jira/browse/SPARK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14230807#comment-14230807
 ] 

Josh Rosen commented on SPARK-4498:
-----------------------------------

[~andrewor14] and I just had a long discussion about this.  To recap things, I 
think that there are two distinct bugs:

1. Driver disconnection is not properly detected by the Master.
2. The application failure detection logic SPARK-2425 is broken.

The second bug can mask the first one.  If something is wrong with the logic 
for detecting a disconnected application / driver, then the application still 
might eventually be removed if the "fail an application when its executors 
fail" logic worked correctly, since every executor assigned to the exited 
driver will fail.

To simulate bugs in the driver disconnected logic, we can simply comment out 
the relevant line in the Master's {{DisassociatedEvent}} handler.  After doing 
this, I was able to reproduce behavior similar to the issue reported in this 
ticket.  With a local standalone cluster with one master, one worker, and two 
executor slots, I was able to start {{spark-shell}}, kill it with {{kill -9}}, 
then browse to the Master web UI and see that executors were continually 
launching and failing even though the driver had exited (this continued for 30+ 
minutes; it's still happening).

I think that the logic for SPARK-2425 is completely broken in the sense that 
there are some cluster configurations for which it will _never_ kill an 
application even though every executor that it launches fails, and that there 
are very few scenarios where it will ever fail an application.  To help explain 
this, I've created a commit that factors out the failure detection logic into 
its own class: 
https://github.com/JoshRosen/spark/commit/87d7960d660b218a9a965fd7d344e2aae0250128.
  That commit also includes unit tests of the failure detection logic in 
isolation, which helps to illustrate the current bugs.

If you look at the current (broken) failure detection logic, two conditions 
must be met for an application to be marked as failed:

1. No executor can be running: {{!execs.exists(_.state == 
ExecutorState.RUNNING)}}
2. More than {{MAX_NUM_RETRY}} consecutive executor failure events must have 
been received: {{!appInfo.incrementRetryCount() < 
ApplicationState.MAX_NUM_RETRY}}

With the current logic, though, these conditions can almost never be met.  The 
current (hardcoded) value of {{MAX_NUM_RETRY}} is 10.  Imagine that I have a 
cluster that only has room for 2 concurrent executors and imagine that all 
executors fail immediately after launching.  In order for the retry count to be 
incremented past the threshold, we must have at least 10 executor failures.  In 
order for this to happen, though, executor launches must occur in between some 
of those failures, since we can only have 2 executors running at the same time. 
 When an executor fails and a new one is relaunched, the new executor enters 
the RUNNING state and sends a message back to the Master, which causes the 
master to reset the retry count back to 0.  Therefore, it's impossible for the 
failure detector to report failure in a cluster with fewer than 10 executor 
slots, even if every executor crashes.

Even for larger clusters, it's still nearly impossible for the current logic to 
declare failure.  Because of the "no executor can be running" condition, all 
executors must be dead in order for it to declare failure.  The Master 
immediately calls {{schedule()}} after an executor failure, though, so in most 
cases a new executor will launch before we can see 10 consecutive failures (the 
test suite in my commit includes a contrived execution where it does declare 
failure, though).

So, what do we do, considering that we need to fix this before 1.2.0 and as a 
bugfix to be included in 1.1.2?  I don't think that we should just revert 
SPARK-2425, since it's not okay to introduce one regression to fix another.  A 
correct application failure detector needs to strike this balance between 
protecting long-running applications from failures and quickly detecting buggy 
applications that will never work.  Long running applications imply that we 
can't have counters / state that monotonically progresses towards failure, 
since we can assume that over an infinite execution there will be an infinite 
number of executor failures.  Intuitively, I think we want something that's 
window-based: if a large fraction of "recently" launched executors have failed, 
then declare that the application has failed.

Unfortunately, the master does not receive any signals about non-faulty 
executors (until they exit cleanly).  One solution would be to add a driver -> 
master RPC that acknowledges that launched executors are in a good state (e.g. 
after a task has been sent to that executor).  This would allow us to properly 
implement a "fail the application if the last _n_ launched executors failed" 
condition.  However, this is still prone to false-positives if a single large, 
buggy host crashes every executor launched on it.  Therefore, we might want to 
also incorporate some notion of worker-blacklisting in to the master.  This 
would still have to be application-specific, since executors could exit 
uncleanly due to application bugs, so we don't want one buggy application to 
impact other applications' blacklists.

This proposal is getting fairly complex, though, so I'd like to see if we can 
come up with a narrower fix to replace the current logic.

Separately, I think that we should implement an explicit Driver -> Master 
heartbeat as an extra layer of defense against driver disconnection detection 
errors, as well as adding lots of additional debug logging in those code paths.

> Standalone Master can fail to recognize completed/failed applications
> ---------------------------------------------------------------------
>
>                 Key: SPARK-4498
>                 URL: https://issues.apache.org/jira/browse/SPARK-4498
>             Project: Spark
>          Issue Type: Bug
>          Components: Deploy, Spark Core
>    Affects Versions: 1.1.1, 1.2.0
>         Environment:  - Linux dn11.chi.shopify.com 3.2.0-57-generic 
> #87-Ubuntu SMP 3 x86_64 x86_64 x86_64 GNU/Linux
>  - Standalone Spark built from 
> apache/spark#c6e0c2ab1c29c184a9302d23ad75e4ccd8060242
>  - Python 2.7.3
> java version "1.7.0_71"
> Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 24.71-b01, mixed mode)
>  - 1 Spark master, 40 Spark workers with 32 cores a piece and 60-90 GB of 
> memory a piece
>  - All client code is PySpark
>            Reporter: Harry Brundage
>            Priority: Blocker
>         Attachments: all-master-logs-around-blip.txt, 
> one-applications-master-logs.txt
>
>
> We observe the spark standalone master not detecting that a driver 
> application has completed after the driver process has shut down 
> indefinitely, leaving that driver's resources consumed indefinitely. The 
> master reports applications as Running, but the driver process has long since 
> terminated. The master continually spawns one executor for the application. 
> It boots, times out trying to connect to the driver application, and then 
> dies with the exception below. The master then spawns another executor on a 
> different worker, which does the same thing. The application lives until the 
> master (and workers) are restarted. 
> This happens to many jobs at once, all right around the same time, two or 
> three times a day, where they all get suck. Before and after this "blip" 
> applications start, get resources, finish, and are marked as finished 
> properly. The "blip" is mostly conjecture on my part, I have no hard evidence 
> that it exists other than my identification of the pattern in the Running 
> Applications table. See 
> http://cl.ly/image/2L383s0e2b3t/Screen%20Shot%202014-11-19%20at%203.43.09%20PM.png
>  : the applications started before the blip at 1.9 hours ago still have 
> active drivers. All the applications started 1.9 hours ago do not, and the 
> applications started less than 1.9 hours ago (at the top of the table) do in 
> fact have active drivers.
> Deploy mode:
>  - PySpark drivers running on one node outside the cluster, scheduled by a 
> cron-like application, not master supervised
>  
> Other factoids:
>  - In most places, we call sc.stop() explicitly before shutting down our 
> driver process
>  - Here's the sum total of spark configuration options we don't set to the 
> default:
> {code}
>     "spark.cores.max": 30
>     "spark.eventLog.dir": "hdfs://nn.shopify.com:8020/var/spark/event-logs"
>     "spark.eventLog.enabled": true
>     "spark.executor.memory": "7g"
>     "spark.hadoop.fs.defaultFS": "hdfs://nn.shopify.com:8020/"
>     "spark.io.compression.codec": "lzf"
>     "spark.ui.killEnabled": true
> {code}
>  - The exception the executors die with is this:
> {code}
> 14/11/19 19:42:37 INFO CoarseGrainedExecutorBackend: Registered signal 
> handlers for [TERM, HUP, INT]
> 14/11/19 19:42:37 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 14/11/19 19:42:37 INFO SecurityManager: Changing view acls to: spark,azkaban
> 14/11/19 19:42:37 INFO SecurityManager: Changing modify acls to: spark,azkaban
> 14/11/19 19:42:37 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(spark, azkaban); 
> users with modify permissions: Set(spark, azkaban)
> 14/11/19 19:42:37 INFO Slf4jLogger: Slf4jLogger started
> 14/11/19 19:42:37 INFO Remoting: Starting remoting
> 14/11/19 19:42:38 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://driverpropsfetc...@dn13.chi.shopify.com:37682]
> 14/11/19 19:42:38 INFO Utils: Successfully started service 
> 'driverPropsFetcher' on port 37682.
> 14/11/19 19:42:38 WARN Remoting: Tried to associate with unreachable remote 
> address [akka.tcp://sparkdri...@spark-etl1.chi.shopify.com:58849]. Address is 
> now gated for 5000 ms, all messages to this address will be delivered to dead 
> letters. Reason: Connection refused: 
> spark-etl1.chi.shopify.com/172.16.126.88:58849
> 14/11/19 19:43:08 ERROR UserGroupInformation: PriviledgedActionException 
> as:azkaban (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures 
> timed out after [30 seconds]
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: 
> Unknown exception in doAs
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
>       at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115)
>       at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163)
>       at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.security.PrivilegedActionException: 
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>       ... 4 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 
> seconds]
>       at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>       at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>       at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>       at scala.concurrent.Await$.result(package.scala:107)
>       at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
>       ... 7 more
> {code}
> Cluster history:
>  - We run spark versions built from apache/spark#master snapshots. We did not 
> observe this behaviour on {{7eb9cbc273d758522e787fcb2ef68ef65911475f}} (sorry 
> its so old), but now observe it on 
> {{c6e0c2ab1c29c184a9302d23ad75e4ccd8060242}}. We can try new versions to 
> assist debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to