Repository: spark
Updated Branches:
  refs/heads/master 70a68b328 -> d1721816d


[SPARK-23200] Reset Kubernetes-specific config on Checkpoint restore

## What changes were proposed in this pull request?

When using the Kubernetes cluster-manager and spawning a Streaming workload, it 
is important to reset many spark.kubernetes.* properties that are generated by 
spark-submit but which would get rewritten when restoring a Checkpoint. This is 
so, because the spark-submit codepath creates Kubernetes resources, such as a 
ConfigMap, a Secret and other variables, which have an autogenerated name and 
the previous one will not resolve anymore.

In short, this change enables checkpoint restoration for streaming workloads, 
and thus enables Spark Streaming workloads in Kubernetes, which were not 
possible to restore from a checkpoint before if the workload went down.

## How was this patch tested?

This patch was tested with the twitter-streaming example in AWS, using 
checkpoints in s3 with the s3a:// protocol, as supported by Hadoop.

This is similar to the YARN related code for resetting a Spark Streaming 
workload, but for the Kubernetes scheduler. I'm adding the initcontainers 
properties because even if the discussion is not completely settled on the 
mailing list, my understanding is that at this moment they are going forward 
for the moment.

For a previous discussion, see the non-rebased work at: 
https://github.com/apache-spark-on-k8s/spark/pull/516

Author: Santiago Saavedra <ssaave...@openshine.com>

Closes #20383 from ssaavedra/fix-k8s-checkpointing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1721816
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1721816
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1721816

Branch: refs/heads/master
Commit: d1721816d26bedee3c72eeb75db49da500568376
Parents: 70a68b3
Author: Santiago Saavedra <ssaave...@openshine.com>
Authored: Fri Jan 26 15:24:06 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri Jan 26 15:24:06 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala     | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1721816/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index aed67a5..ed2a896 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -53,6 +53,21 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
       "spark.driver.host",
       "spark.driver.bindAddress",
       "spark.driver.port",
+      "spark.kubernetes.driver.pod.name",
+      "spark.kubernetes.executor.podNamePrefix",
+      "spark.kubernetes.initcontainer.executor.configmapname",
+      "spark.kubernetes.initcontainer.executor.configmapkey",
+      "spark.kubernetes.initcontainer.downloadJarsResourceIdentifier",
+      "spark.kubernetes.initcontainer.downloadJarsSecretLocation",
+      "spark.kubernetes.initcontainer.downloadFilesResourceIdentifier",
+      "spark.kubernetes.initcontainer.downloadFilesSecretLocation",
+      "spark.kubernetes.initcontainer.remoteJars",
+      "spark.kubernetes.initcontainer.remoteFiles",
+      "spark.kubernetes.mountdependencies.jarsDownloadDir",
+      "spark.kubernetes.mountdependencies.filesDownloadDir",
+      "spark.kubernetes.initcontainer.executor.stagingServerSecret.name",
+      "spark.kubernetes.initcontainer.executor.stagingServerSecret.mountDir",
+      "spark.kubernetes.executor.limit.cores",
       "spark.master",
       "spark.yarn.jars",
       "spark.yarn.keytab",
@@ -66,6 +81,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
     val newSparkConf = new SparkConf(loadDefaults = 
false).setAll(sparkConfPairs)
       .remove("spark.driver.host")
       .remove("spark.driver.bindAddress")
+      .remove("spark.kubernetes.driver.pod.name")
       .remove("spark.driver.port")
     val newReloadConf = new SparkConf(loadDefaults = true)
     propertiesToReload.foreach { prop =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to