spark git commit: [SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL

2015-03-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 47cce984e - 5c16ced1e


[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running 
groupByKey with class defined in REPL

```
case class ClassA(value: String)
val rdd = sc.parallelize(List((k1, ClassA(v1)), (k1, ClassA(v2)) ))
rdd.groupByKey.collect
```
This code used to be throw exception in spark-shell, because while shuffling 
```JavaSerializer```uses ```defaultClassLoader``` which was defined like 
```env.serializer.setDefaultClassLoader(urlClassLoader)```.

It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like
```
override def run() {
  val deserializeStartTime = System.currentTimeMillis()
  Thread.currentThread.setContextClassLoader(replClassLoader)
```
in TaskRunner.

When ```replClassLoader``` cannot be defined, it's identical with 
```urlClassLoader```

Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com

Closes #5046 from swkimme/master and squashes the following commits:

fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() )
6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() )
d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() )
a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to 
ReplSuite
bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to 
ReplSuite
c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer

(cherry picked from commit f0edeae7f9ab7eae02c227be9162ec69d22c92bd)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c16ced1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c16ced1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c16ced1

Branch: refs/heads/branch-1.3
Commit: 5c16ced1e6c2dcadc0179eda8b273071254e285b
Parents: 47cce98
Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com
Authored: Mon Mar 16 23:49:23 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Mar 16 23:49:55 2015 -0700

--
 .../org/apache/spark/executor/Executor.scala|  2 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 
 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 
 3 files changed, 63 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index bed0a08..c6ff38d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -104,7 +104,7 @@ private[spark] class Executor(
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
 
   // Set the classloader for serializer
-  env.serializer.setDefaultClassLoader(urlClassLoader)
+  env.serializer.setDefaultClassLoader(replClassLoader)
 
   // Akka's message frame size. If task result is bigger than this, we use the 
block manager
   // to send the result back.

http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 249f438..934daae 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -121,9 +121,9 @@ class ReplSuite extends FunSuite {
 val output = runInterpreter(local,
   
 |var v = 7
-|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_)
+|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_)
 |v = 10
-|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_)
+|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_)
   .stripMargin)
 assertDoesNotContain(error:, output)
 assertDoesNotContain(Exception, output)
@@ -137,7 +137,7 @@ class ReplSuite extends FunSuite {
 |class C {
 |def foo = 5
 |}
-|sc.parallelize(1 to 10).map(x = (new C).foo).collect.reduceLeft(_+_)
+|sc.parallelize(1 to 10).map(x = (new 
C).foo).collect().reduceLeft(_+_)
   .stripMargin)
 assertDoesNotContain(error:, output)
 assertDoesNotContain(Exception, output)
@@ -148,7 +148,7 @@ class ReplSuite extends FunSuite {
 val output = runInterpreter(local,
   
 |def 

spark git commit: [SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL

2015-03-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 9667b9f9c - f0edeae7f


[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running 
groupByKey with class defined in REPL

```
case class ClassA(value: String)
val rdd = sc.parallelize(List((k1, ClassA(v1)), (k1, ClassA(v2)) ))
rdd.groupByKey.collect
```
This code used to be throw exception in spark-shell, because while shuffling 
```JavaSerializer```uses ```defaultClassLoader``` which was defined like 
```env.serializer.setDefaultClassLoader(urlClassLoader)```.

It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like
```
override def run() {
  val deserializeStartTime = System.currentTimeMillis()
  Thread.currentThread.setContextClassLoader(replClassLoader)
```
in TaskRunner.

When ```replClassLoader``` cannot be defined, it's identical with 
```urlClassLoader```

Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com

Closes #5046 from swkimme/master and squashes the following commits:

fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() )
6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() )
d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect - collect() )
a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to 
ReplSuite
bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to 
ReplSuite
c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0edeae7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0edeae7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0edeae7

Branch: refs/heads/master
Commit: f0edeae7f9ab7eae02c227be9162ec69d22c92bd
Parents: 9667b9f
Author: Kevin (Sangwoo) Kim sangwookim...@gmail.com
Authored: Mon Mar 16 23:49:23 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Mar 16 23:49:23 2015 -0700

--
 .../org/apache/spark/executor/Executor.scala|  2 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 
 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 
 3 files changed, 63 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a897e53..6196f7b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -103,7 +103,7 @@ private[spark] class Executor(
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
 
   // Set the classloader for serializer
-  env.serializer.setDefaultClassLoader(urlClassLoader)
+  env.serializer.setDefaultClassLoader(replClassLoader)
 
   // Akka's message frame size. If task result is bigger than this, we use the 
block manager
   // to send the result back.

http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 249f438..934daae 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -121,9 +121,9 @@ class ReplSuite extends FunSuite {
 val output = runInterpreter(local,
   
 |var v = 7
-|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_)
+|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_)
 |v = 10
-|sc.parallelize(1 to 10).map(x = v).collect.reduceLeft(_+_)
+|sc.parallelize(1 to 10).map(x = v).collect().reduceLeft(_+_)
   .stripMargin)
 assertDoesNotContain(error:, output)
 assertDoesNotContain(Exception, output)
@@ -137,7 +137,7 @@ class ReplSuite extends FunSuite {
 |class C {
 |def foo = 5
 |}
-|sc.parallelize(1 to 10).map(x = (new C).foo).collect.reduceLeft(_+_)
+|sc.parallelize(1 to 10).map(x = (new 
C).foo).collect().reduceLeft(_+_)
   .stripMargin)
 assertDoesNotContain(error:, output)
 assertDoesNotContain(Exception, output)
@@ -148,7 +148,7 @@ class ReplSuite extends FunSuite {
 val output = runInterpreter(local,
   
 |def double(x: Int) = x + x
-|sc.parallelize(1 to 10).map(x = double(x)).collect.reduceLeft(_+_)
+