spark git commit: [SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL
Repository: spark Updated Branches: refs/heads/branch-1.3 47cce984e - 5c16ced1e [SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL ``` case class ClassA(value: String) val rdd = sc.parallelize(List((k1, ClassA(v1)), (k1, ClassA(v2)) )) rdd.groupByKey.collect ``` This code used to be throw exception in spark-shell, because while shuffling ```JavaSerializer```uses ```defaultClassLoader``` which was defined like ```env.serializer.setDefaultClassLoader(urlClassLoader)```. It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like ``` override def run() { val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) ``` in TaskRunner. When ```replClassLoader``` cannot be defined, it's identical with ```urlClassLoader``` Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com Closes #5046 from swkimme/master and squashes the following commits: fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() ) 6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() ) d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() ) a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer (cherry picked from commit f0edeae7f9ab7eae02c227be9162ec69d22c92bd) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c16ced1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c16ced1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c16ced1 Branch: refs/heads/branch-1.3 Commit: 5c16ced1e6c2dcadc0179eda8b273071254e285b Parents: 47cce98 Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com Authored: Mon Mar 16 23:49:23 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Mar 16 23:49:55 2015 -0700 -- .../org/apache/spark/executor/Executor.scala| 2 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 50 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 3 files changed, 63 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bed0a08..c6ff38d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -104,7 +104,7 @@ private[spark] class Executor( private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) // Set the classloader for serializer - env.serializer.setDefaultClassLoader(urlClassLoader) + env.serializer.setDefaultClassLoader(replClassLoader) // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 249f438..934daae 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -121,9 +121,9 @@ class ReplSuite extends FunSuite { val output = runInterpreter(local, |var v = 7 -|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_) +|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_) |v = 10 -|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_) +|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_) .stripMargin) assertDoesNotContain(error:, output) assertDoesNotContain(Exception, output) @@ -137,7 +137,7 @@ class ReplSuite extends FunSuite { |class C { |def foo = 5 |} -|sc.parallelize(1 to 10).map(x = (new C).foo).collect.reduceLeft(_+_) +|sc.parallelize(1 to 10).map(x = (new C).foo).collect().reduceLeft(_+_) .stripMargin) assertDoesNotContain(error:, output) assertDoesNotContain(Exception, output) @@ -148,7 +148,7 @@ class ReplSuite extends FunSuite { val output = runInterpreter(local, |def
spark git commit: [SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL
Repository: spark Updated Branches: refs/heads/master 9667b9f9c - f0edeae7f [SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL ``` case class ClassA(value: String) val rdd = sc.parallelize(List((k1, ClassA(v1)), (k1, ClassA(v2)) )) rdd.groupByKey.collect ``` This code used to be throw exception in spark-shell, because while shuffling ```JavaSerializer```uses ```defaultClassLoader``` which was defined like ```env.serializer.setDefaultClassLoader(urlClassLoader)```. It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like ``` override def run() { val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) ``` in TaskRunner. When ```replClassLoader``` cannot be defined, it's identical with ```urlClassLoader``` Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com Closes #5046 from swkimme/master and squashes the following commits: fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() ) 6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() ) d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() ) a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0edeae7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0edeae7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0edeae7 Branch: refs/heads/master Commit: f0edeae7f9ab7eae02c227be9162ec69d22c92bd Parents: 9667b9f Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com Authored: Mon Mar 16 23:49:23 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Mar 16 23:49:23 2015 -0700 -- .../org/apache/spark/executor/Executor.scala| 2 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 50 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 3 files changed, 63 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a897e53..6196f7b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -103,7 +103,7 @@ private[spark] class Executor( private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) // Set the classloader for serializer - env.serializer.setDefaultClassLoader(urlClassLoader) + env.serializer.setDefaultClassLoader(replClassLoader) // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 249f438..934daae 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -121,9 +121,9 @@ class ReplSuite extends FunSuite { val output = runInterpreter(local, |var v = 7 -|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_) +|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_) |v = 10 -|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_) +|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_) .stripMargin) assertDoesNotContain(error:, output) assertDoesNotContain(Exception, output) @@ -137,7 +137,7 @@ class ReplSuite extends FunSuite { |class C { |def foo = 5 |} -|sc.parallelize(1 to 10).map(x = (new C).foo).collect.reduceLeft(_+_) +|sc.parallelize(1 to 10).map(x = (new C).foo).collect().reduceLeft(_+_) .stripMargin) assertDoesNotContain(error:, output) assertDoesNotContain(Exception, output) @@ -148,7 +148,7 @@ class ReplSuite extends FunSuite { val output = runInterpreter(local, |def double(x: Int) = x + x -|sc.parallelize(1 to 10).map(x = double(x)).collect.reduceLeft(_+_) +