Repository: spark Updated Branches: refs/heads/master 70a68b328 -> d1721816d
[SPARK-23200] Reset Kubernetes-specific config on Checkpoint restore ## What changes were proposed in this pull request? When using the Kubernetes cluster-manager and spawning a Streaming workload, it is important to reset many spark.kubernetes.* properties that are generated by spark-submit but which would get rewritten when restoring a Checkpoint. This is so, because the spark-submit codepath creates Kubernetes resources, such as a ConfigMap, a Secret and other variables, which have an autogenerated name and the previous one will not resolve anymore. In short, this change enables checkpoint restoration for streaming workloads, and thus enables Spark Streaming workloads in Kubernetes, which were not possible to restore from a checkpoint before if the workload went down. ## How was this patch tested? This patch was tested with the twitter-streaming example in AWS, using checkpoints in s3 with the s3a:// protocol, as supported by Hadoop. This is similar to the YARN related code for resetting a Spark Streaming workload, but for the Kubernetes scheduler. I'm adding the initcontainers properties because even if the discussion is not completely settled on the mailing list, my understanding is that at this moment they are going forward for the moment. For a previous discussion, see the non-rebased work at: https://github.com/apache-spark-on-k8s/spark/pull/516 Author: Santiago Saavedra <ssaave...@openshine.com> Closes #20383 from ssaavedra/fix-k8s-checkpointing. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1721816 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1721816 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1721816 Branch: refs/heads/master Commit: d1721816d26bedee3c72eeb75db49da500568376 Parents: 70a68b3 Author: Santiago Saavedra <ssaave...@openshine.com> Authored: Fri Jan 26 15:24:06 2018 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Fri Jan 26 15:24:06 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/streaming/Checkpoint.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d1721816/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index aed67a5..ed2a896 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -53,6 +53,21 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.driver.host", "spark.driver.bindAddress", "spark.driver.port", + "spark.kubernetes.driver.pod.name", + "spark.kubernetes.executor.podNamePrefix", + "spark.kubernetes.initcontainer.executor.configmapname", + "spark.kubernetes.initcontainer.executor.configmapkey", + "spark.kubernetes.initcontainer.downloadJarsResourceIdentifier", + "spark.kubernetes.initcontainer.downloadJarsSecretLocation", + "spark.kubernetes.initcontainer.downloadFilesResourceIdentifier", + "spark.kubernetes.initcontainer.downloadFilesSecretLocation", + "spark.kubernetes.initcontainer.remoteJars", + "spark.kubernetes.initcontainer.remoteFiles", + "spark.kubernetes.mountdependencies.jarsDownloadDir", + "spark.kubernetes.mountdependencies.filesDownloadDir", + "spark.kubernetes.initcontainer.executor.stagingServerSecret.name", + "spark.kubernetes.initcontainer.executor.stagingServerSecret.mountDir", + "spark.kubernetes.executor.limit.cores", "spark.master", "spark.yarn.jars", "spark.yarn.keytab", @@ -66,6 +81,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.bindAddress") + .remove("spark.kubernetes.driver.pod.name") .remove("spark.driver.port") val newReloadConf = new SparkConf(loadDefaults = true) propertiesToReload.foreach { prop => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org