spark git commit: [SPARK-6331] Load new master URL if present when recovering streaming context from checkpoint

2015-03-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 426816b5c - 95f8d1c51


[SPARK-6331] Load new master URL if present when recovering streaming context 
from checkpoint

In streaming driver recovery, when the SparkConf is reconstructed based on the 
checkpointed configuration, it recovers the old master URL. This okay if the 
cluster on which the streaming application is relaunched is the same cluster as 
it was running before. But if that cluster changes, there is no way to inject 
the new master URL of the new cluster. As a result, the restarted app tries to 
connect to the non-existent old cluster and fails.

The solution is to check whether a master URL is set in the System properties 
(by Spark submit) before recreating the SparkConf. If a new master url is set 
in the properties, then use it as that is obviously the most relevant one. 
Otherwise load the old one (to maintain existing behavior).

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #5024 from tdas/SPARK-6331 and squashes the following commits:

392fd44 [Tathagata Das] Fixed naming issue.
c7c0b99 [Tathagata Das] Addressed comments.
6a0857c [Tathagata Das] Updated testsuites.
222485d [Tathagata Das] Load new master URL if present when recovering 
streaming context from checkpoint

(cherry picked from commit c928796ade54f68e26bc55734a9867a046d2e3fe)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95f8d1c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95f8d1c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95f8d1c5

Branch: refs/heads/branch-1.3
Commit: 95f8d1c51dabf89a50985d488ac68977ebaf9771
Parents: 426816b
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Tue Mar 17 05:31:27 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Tue Mar 17 05:31:57 2015 -0700

--
 .../org/apache/spark/streaming/Checkpoint.scala |  7 +--
 .../spark/streaming/StreamingContext.scala  |  2 +-
 .../spark/streaming/CheckpointSuite.scala   | 21 +---
 .../spark/streaming/StreamingContextSuite.scala |  2 +-
 4 files changed, 25 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95f8d1c5/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b780282..06e82f7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,10 +43,13 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
   val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConfPairs = ssc.conf.getAll
 
-  def sparkConf = {
-new SparkConf(false).setAll(sparkConfPairs)
+  def createSparkConf(): SparkConf = {
+val newSparkConf = new SparkConf(loadDefaults = 
false).setAll(sparkConfPairs)
   .remove(spark.driver.host)
   .remove(spark.driver.port)
+val newMasterOption = new SparkConf(loadDefaults = 
true).getOption(spark.master)
+newMasterOption.foreach { newMaster = newSparkConf.setMaster(newMaster) }
+newSparkConf
   }
 
   def validate() {

http://git-wip-us.apache.org/repos/asf/spark/blob/95f8d1c5/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b5b6770..543224d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,7 +116,7 @@ class StreamingContext private[streaming] (
 
   private[streaming] val sc: SparkContext = {
 if (isCheckpointPresent) {
-  new SparkContext(cp_.sparkConf)
+  new SparkContext(cp_.createSparkConf())
 } else {
   sc_
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/95f8d1c5/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 03c448f..8ea91ec 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ 

spark git commit: [SPARK-6331] Load new master URL if present when recovering streaming context from checkpoint

2015-03-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master e26db9be4 - c928796ad


[SPARK-6331] Load new master URL if present when recovering streaming context 
from checkpoint

In streaming driver recovery, when the SparkConf is reconstructed based on the 
checkpointed configuration, it recovers the old master URL. This okay if the 
cluster on which the streaming application is relaunched is the same cluster as 
it was running before. But if that cluster changes, there is no way to inject 
the new master URL of the new cluster. As a result, the restarted app tries to 
connect to the non-existent old cluster and fails.

The solution is to check whether a master URL is set in the System properties 
(by Spark submit) before recreating the SparkConf. If a new master url is set 
in the properties, then use it as that is obviously the most relevant one. 
Otherwise load the old one (to maintain existing behavior).

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #5024 from tdas/SPARK-6331 and squashes the following commits:

392fd44 [Tathagata Das] Fixed naming issue.
c7c0b99 [Tathagata Das] Addressed comments.
6a0857c [Tathagata Das] Updated testsuites.
222485d [Tathagata Das] Load new master URL if present when recovering 
streaming context from checkpoint


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c928796a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c928796a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c928796a

Branch: refs/heads/master
Commit: c928796ade54f68e26bc55734a9867a046d2e3fe
Parents: e26db9be
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Tue Mar 17 05:31:27 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Tue Mar 17 05:31:27 2015 -0700

--
 .../org/apache/spark/streaming/Checkpoint.scala |  7 +--
 .../spark/streaming/StreamingContext.scala  |  2 +-
 .../spark/streaming/CheckpointSuite.scala   | 21 +---
 .../spark/streaming/StreamingContextSuite.scala |  2 +-
 4 files changed, 25 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index f88a8a0..cb4c94f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,10 +43,13 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
   val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConfPairs = ssc.conf.getAll
 
-  def sparkConf = {
-new SparkConf(false).setAll(sparkConfPairs)
+  def createSparkConf(): SparkConf = {
+val newSparkConf = new SparkConf(loadDefaults = 
false).setAll(sparkConfPairs)
   .remove(spark.driver.host)
   .remove(spark.driver.port)
+val newMasterOption = new SparkConf(loadDefaults = 
true).getOption(spark.master)
+newMasterOption.foreach { newMaster = newSparkConf.setMaster(newMaster) }
+newSparkConf
   }
 
   def validate() {

http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b5b6770..543224d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,7 +116,7 @@ class StreamingContext private[streaming] (
 
   private[streaming] val sc: SparkContext = {
 if (isCheckpointPresent) {
-  new SparkContext(cp_.sparkConf)
+  new SparkContext(cp_.createSparkConf())
 } else {
   sc_
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 03c448f..8ea91ec 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -146,7 +146,7 @@ class CheckpointSuite extends TestSuiteBase {
 
   // This tests whether spark conf persists through