This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new f27c951 [SPARK-27198][CORE] Heartbeat interval mismatch in driver and executor f27c951 is described below commit f27c9514ca5560acf652f944821320424a6f3860 Author: Ajith <ajith2...@gmail.com> AuthorDate: Mon Mar 25 15:38:07 2019 -0500 [SPARK-27198][CORE] Heartbeat interval mismatch in driver and executor ## What changes were proposed in this pull request? When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds) ## How was this patch tested? Will add UTs Closes #24140 from ajithme/intervalissue. Authored-by: Ajith <ajith2...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +++- core/src/main/scala/org/apache/spark/executor/Executor.scala | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6c4c5c9..854dfbd 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet +import scala.concurrent.duration._ import org.apache.avro.{Schema, SchemaNormalization} @@ -610,7 +611,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") + val executorHeartbeatInterval = + getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 97dfcc4..9ac8063 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -831,9 +832,11 @@ private[spark] class Executor( } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) + val heartbeatIntervalInSec = + conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org