Repository: spark
Updated Branches:
  refs/heads/branch-1.6 089ba81d1 -> 9d6238859


[SPARK-11457][STREAMING][YARN] Fix incorrect AM proxy filter conf recovery from 
checkpoint

Currently Yarn AM proxy filter configuration is recovered from checkpoint file 
when Spark Streaming application is restarted, which will lead to some unwanted 
behaviors:

1. Wrong RM address if RM is redeployed from failure.
2. Wrong proxyBase, since app id is updated, old app id for proxyBase is wrong.

So instead of recovering from checkpoint file, these configurations should be 
reloaded each time when app started.

This problem only exists in Yarn cluster mode, for Yarn client mode, these 
configurations will be updated with RPC message `AddWebUIFilter`.

Please help to review tdas harishreedharan vanzin , thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #9412 from jerryshao/SPARK-11457.

(cherry picked from commit 468ad0ae874d5cf55712ee976faf77f19c937ccb)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d623885
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d623885
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d623885

Branch: refs/heads/branch-1.6
Commit: 9d6238859e98651f32b9dd733a83c0e6f45978a7
Parents: 089ba81
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Nov 5 18:03:12 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Nov 6 11:57:16 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/Checkpoint.scala  | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d623885/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b7de6dd..0cd55d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -55,7 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
       "spark.driver.port",
       "spark.master",
       "spark.yarn.keytab",
-      "spark.yarn.principal")
+      "spark.yarn.principal",
+      "spark.ui.filters")
 
     val newSparkConf = new SparkConf(loadDefaults = 
false).setAll(sparkConfPairs)
       .remove("spark.driver.host")
@@ -66,6 +67,16 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
         newSparkConf.set(prop, value)
       }
     }
+
+    // Add Yarn proxy filter specific configurations to the recovered SparkConf
+    val filter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    val filterPrefix = s"spark.$filter.param."
+    newReloadConf.getAll.foreach { case (k, v) =>
+      if (k.startsWith(filterPrefix) && k.length > filterPrefix.length) {
+        newSparkConf.set(k, v)
+      }
+    }
+
     newSparkConf
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to