Repository: spark Updated Branches: refs/heads/master a28146568 -> 9362c5cc2
[SPARK-25449][CORE] Heartbeat shouldn't include accumulators for zero metrics ## What changes were proposed in this pull request? Heartbeat shouldn't include accumulators for zero metrics. Heartbeats sent from executors to the driver every 10 seconds contain metrics and are generally on the order of a few KBs. However, for large jobs with lots of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die with heartbeat failures. We can mitigate this by not sending zero metrics to the driver. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22473 from mukulmurthy/25449-heartbeat. Authored-by: Mukul Murthy <mukul.mur...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9362c5cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9362c5cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9362c5cc Branch: refs/heads/master Commit: 9362c5cc273fdd09f9b3b512e2f6b64bcefc25ab Parents: a281465 Author: Mukul Murthy <mukul.mur...@gmail.com> Authored: Fri Sep 28 16:34:17 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Sep 28 16:34:17 2018 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkConf.scala | 11 +- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 40 +++++-- .../apache/spark/internal/config/package.scala | 14 +++ .../apache/spark/executor/ExecutorSuite.scala | 111 +++++++++++++++++-- .../MesosCoarseGrainedSchedulerBackend.scala | 3 +- 6 files changed, 154 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e0f98f1..81aa31d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -609,13 +609,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") - val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") + val executorTimeoutThresholdMs = + getTimeAsSeconds("spark.network.timeout", "120s") * 1000 + val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. - require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + - s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.") + require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + + s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " + + s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d943087..0a66dae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -499,7 +499,7 @@ class SparkContext(config: SparkConf) extends Logging { // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater", - conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index eba708d..61deb54 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -120,7 +121,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) // Whether to monitor killed / interrupted tasks private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) @@ -170,22 +171,33 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] + /** + * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` + * times, it should kill itself. The default value is 60. It means we will retry to send + * heartbeats about 10 minutes because the heartbeat interval is 10s. + */ + private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES) + + /** + * Whether to drop empty accumulators from heartbeats sent to the driver. Including the empty + * accumulators (that satisfy isZero) can make the size of the heartbeat message very large. + */ + private val HEARTBEAT_DROP_ZEROES = conf.get(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES) + + /** + * Interval to send heartbeats, in milliseconds + */ + private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) + // Executor for the heartbeat task. private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, - "executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + "executor-heartbeater", HEARTBEAT_INTERVAL_MS) // must be initialized before running startDriverHeartbeat() private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv) /** - * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` - * times, it should kill itself. The default value is 60. It means we will retry to send - * heartbeats about 10 minutes because the heartbeat interval is 10s. - */ - private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) - - /** * Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each * successful heartbeat will reset it to 0. */ @@ -834,7 +846,13 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) + val accumulatorsToReport = + if (HEARTBEAT_DROP_ZEROES) { + taskRunner.task.metrics.accumulators().filterNot(_.isZero) + } else { + taskRunner.task.metrics.accumulators() + } + accumUpdates += ((taskRunner.taskId, accumulatorsToReport)) } } @@ -842,7 +860,7 @@ private[spark] class Executor( executorUpdates) try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key)) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7f63422..e8b1d88 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -83,6 +83,20 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES = + ConfigBuilder("spark.executor.heartbeat.dropZeroAccumulatorUpdates") + .internal() + .booleanConf + .createWithDefault(true) + + private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = + ConfigBuilder("spark.executor.heartbeatInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES = + ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60) + private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 77a7668..1f8a657 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -21,9 +21,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.lang.Thread.UncaughtExceptionHandler import java.nio.ByteBuffer import java.util.Properties -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Map import scala.concurrent.duration._ import scala.language.postfixOps @@ -33,22 +34,25 @@ import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.PrivateMethodTester import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.memory.MemoryManager +import org.apache.spark.internal.config._ +import org.apache.spark.memory.TestMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.RDD -import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription} +import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout} +import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.storage.{BlockManager, BlockManagerId} +import org.apache.spark.util.{LongAccumulator, UninterruptibleThread} -class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually { +class ExecutorSuite extends SparkFunSuite + with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester { test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") { // mock some objects to make Executor.launchTask() happy @@ -252,18 +256,107 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } + test("Heartbeat should drop zero accumulator updates") { + heartbeatZeroAccumulatorUpdateTest(true) + } + + test("Heartbeat should not drop zero accumulator updates when the conf is disabled") { + heartbeatZeroAccumulatorUpdateTest(false) + } + + private def withHeartbeatExecutor(confs: (String, String)*) + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + val conf = new SparkConf + confs.foreach { case (k, v) => conf.set(k, v) } + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + val executor = + new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) + val executorClass = classOf[Executor] + + // Save all heartbeats sent into an ArrayBuffer for verification + val heartbeats = ArrayBuffer[Heartbeat]() + val mockReceiver = mock[RpcEndpointRef] + when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer(new Answer[HeartbeatResponse] { + override def answer(invocation: InvocationOnMock): HeartbeatResponse = { + val args = invocation.getArguments() + val mock = invocation.getMock + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) + } + }) + val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") + receiverRef.setAccessible(true) + receiverRef.set(executor, mockReceiver) + + f(executor, heartbeats) + } + + private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): Unit = { + val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString + withHeartbeatExecutor(c) { (executor, heartbeats) => + val reportHeartbeat = PrivateMethod[Unit]('reportHeartBeat) + + // When no tasks are running, there should be no accumulators sent in heartbeat + executor.invokePrivate(reportHeartbeat()) + // invokeReportHeartbeat(executor) + assert(heartbeats.length == 1) + assert(heartbeats(0).accumUpdates.length == 0, + "No updates should be sent when no tasks are running") + + // When we start a task with a nonzero accumulator, that should end up in the heartbeat + val metrics = new TaskMetrics() + val nonZeroAccumulator = new LongAccumulator() + nonZeroAccumulator.add(1) + metrics.registerAccumulator(nonZeroAccumulator) + + val executorClass = classOf[Executor] + val tasksMap = { + val field = + executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks") + field.setAccessible(true) + field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]] + } + val mockTaskRunner = mock[executor.TaskRunner] + val mockTask = mock[Task[Any]] + when(mockTask.metrics).thenReturn(metrics) + when(mockTaskRunner.taskId).thenReturn(6) + when(mockTaskRunner.task).thenReturn(mockTask) + when(mockTaskRunner.startGCTime).thenReturn(1) + tasksMap.put(6, mockTaskRunner) + + executor.invokePrivate(reportHeartbeat()) + assert(heartbeats.length == 2) + val updates = heartbeats(1).accumUpdates + assert(updates.length == 1 && updates(0)._1 == 6, + "Heartbeat should only send update for the one task running") + val accumsSent = updates(0)._2.length + assert(accumsSent > 0, "The nonzero accumulator we added should be sent") + if (dropZeroMetrics) { + assert(accumsSent == metrics.accumulators().count(!_.isZero), + "The number of accumulators sent should match the number of nonzero accumulators") + } else { + assert(accumsSent == metrics.accumulators().length, + "The number of accumulators sent should match the number of total accumulators") + } + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] val mockMetricsSystem = mock[MetricsSystem] - val mockMemoryManager = mock[MemoryManager] + val mockBlockManager = mock[BlockManager] when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) when(mockEnv.serializerManager).thenReturn(mock[SerializerManager]) when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) - when(mockEnv.memoryManager).thenReturn(mockMemoryManager) + when(mockEnv.memoryManager).thenReturn(new TestMemoryManager(conf)) when(mockEnv.closureSerializer).thenReturn(serializer) + when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", "hostA", 1234)) + when(mockEnv.blockManager).thenReturn(mockBlockManager) SparkEnv.set(mockEnv) mockEnv } http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 178de30..bac0246 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config +import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org