spark git commit: Revert "Once driver register successfully, stop it to connect to master."

2015-12-17 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 5a514b61b -> cd3d937b0


Revert "Once driver register successfully, stop it to connect to master."

This reverts commit 5a514b61bbfb609c505d8d65f2483068a56f1f70.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd3d937b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd3d937b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd3d937b

Branch: refs/heads/master
Commit: cd3d937b0cc89cb5e4098f7d9f5db2712e3de71e
Parents: 5a514b6
Author: Davies Liu 
Authored: Thu Dec 17 08:01:27 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 17 08:01:27 2015 -0800

--
 core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d937b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 3cf7464..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -130,7 +130,6 @@ private[spark] class AppClient(
 if (registered.get) {
   registerMasterFutures.get.foreach(_.cancel(true))
   registerMasterThreadPool.shutdownNow()
-  registrationRetryTimer.cancel(true)
 } else if (nthRetry >= REGISTRATION_RETRIES) {
   markDead("All masters are unresponsive! Giving up.")
 } else {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12395] [SQL] fix resulting columns of outer join

2015-12-17 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master cd3d937b0 -> a170d34a1


[SPARK-12395] [SQL] fix resulting columns of outer join

For API DataFrame.join(right, usingColumns, joinType), if the joinType is 
right_outer or full_outer, the resulting join columns could be wrong (will be 
null).

The order of columns had been changed to match that with MySQL and PostgreSQL 
[1].

This PR also fix the nullability of output for outer join.

[1] http://www.postgresql.org/docs/9.2/static/queries-table-expressions.html

Author: Davies Liu 

Closes #10353 from davies/fix_join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a170d34a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a170d34a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a170d34a

Branch: refs/heads/master
Commit: a170d34a1b309fecc76d1370063e0c4f44dc2142
Parents: cd3d937
Author: Davies Liu 
Authored: Thu Dec 17 08:04:11 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 17 08:04:11 2015 -0800

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 25 
 .../apache/spark/sql/DataFrameJoinSuite.scala   | 20 
 2 files changed, 36 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a170d34a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6250e95..d741312 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
@@ -455,10 +455,8 @@ class DataFrame private[sql](
 // Analyze the self join. The assumption is that the analyzer will 
disambiguate left vs right
 // by creating a new instance for one of the branch.
 val joined = sqlContext.executePlan(
-  Join(logicalPlan, right.logicalPlan, joinType = Inner, 
None)).analyzed.asInstanceOf[Join]
+  Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
None)).analyzed.asInstanceOf[Join]
 
-// Project only one of the join columns.
-val joinedCols = usingColumns.map(col => 
withPlan(joined.right).resolve(col))
 val condition = usingColumns.map { col =>
   catalyst.expressions.EqualTo(
 withPlan(joined.left).resolve(col),
@@ -467,9 +465,26 @@ class DataFrame private[sql](
   catalyst.expressions.And(cond, eqTo)
 }
 
+// Project only one of the join columns.
+val joinedCols = JoinType(joinType) match {
+  case Inner | LeftOuter | LeftSemi =>
+usingColumns.map(col => withPlan(joined.left).resolve(col))
+  case RightOuter =>
+usingColumns.map(col => withPlan(joined.right).resolve(col))
+  case FullOuter =>
+usingColumns.map { col =>
+  val leftCol = withPlan(joined.left).resolve(col)
+  val rightCol = withPlan(joined.right).resolve(col)
+  Alias(Coalesce(Seq(leftCol, rightCol)), col)()
+}
+}
+// The nullability of output of joined could be different than original 
column,
+// so we can only compare them by exprId
+val joinRefs = 
condition.map(_.references.toSeq.map(_.exprId)).getOrElse(Nil)
+val resultCols = joinedCols ++ joined.output.filterNot(e => 
joinRefs.contains(e.exprId))
 withPlan {
   Project(
-joined.output.filterNot(joinedCols.contains(_)),
+resultCols,
 Join(
   joined.left,
   joined.right,

http://git-wip-us.apache.org/repos/asf/spark/blob/a170d34a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index c70397f..39a6541 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ 

spark git commit: [SPARK-12395] [SQL] fix resulting columns of outer join

2015-12-17 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a8466489a -> 1ebedb20f


[SPARK-12395] [SQL] fix resulting columns of outer join

For API DataFrame.join(right, usingColumns, joinType), if the joinType is 
right_outer or full_outer, the resulting join columns could be wrong (will be 
null).

The order of columns had been changed to match that with MySQL and PostgreSQL 
[1].

This PR also fix the nullability of output for outer join.

[1] http://www.postgresql.org/docs/9.2/static/queries-table-expressions.html

Author: Davies Liu 

Closes #10353 from davies/fix_join.

(cherry picked from commit a170d34a1b309fecc76d1370063e0c4f44dc2142)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ebedb20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ebedb20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ebedb20

Branch: refs/heads/branch-1.6
Commit: 1ebedb20f2c5b781eafa9bf2b5ab092d744cc4fd
Parents: a846648
Author: Davies Liu 
Authored: Thu Dec 17 08:04:11 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 17 08:04:24 2015 -0800

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 25 
 .../apache/spark/sql/DataFrameJoinSuite.scala   | 20 
 2 files changed, 36 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1ebedb20/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index cc8b70b..74f9370 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
@@ -499,10 +499,8 @@ class DataFrame private[sql](
 // Analyze the self join. The assumption is that the analyzer will 
disambiguate left vs right
 // by creating a new instance for one of the branch.
 val joined = sqlContext.executePlan(
-  Join(logicalPlan, right.logicalPlan, joinType = Inner, 
None)).analyzed.asInstanceOf[Join]
+  Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
None)).analyzed.asInstanceOf[Join]
 
-// Project only one of the join columns.
-val joinedCols = usingColumns.map(col => 
withPlan(joined.right).resolve(col))
 val condition = usingColumns.map { col =>
   catalyst.expressions.EqualTo(
 withPlan(joined.left).resolve(col),
@@ -511,9 +509,26 @@ class DataFrame private[sql](
   catalyst.expressions.And(cond, eqTo)
 }
 
+// Project only one of the join columns.
+val joinedCols = JoinType(joinType) match {
+  case Inner | LeftOuter | LeftSemi =>
+usingColumns.map(col => withPlan(joined.left).resolve(col))
+  case RightOuter =>
+usingColumns.map(col => withPlan(joined.right).resolve(col))
+  case FullOuter =>
+usingColumns.map { col =>
+  val leftCol = withPlan(joined.left).resolve(col)
+  val rightCol = withPlan(joined.right).resolve(col)
+  Alias(Coalesce(Seq(leftCol, rightCol)), col)()
+}
+}
+// The nullability of output of joined could be different than original 
column,
+// so we can only compare them by exprId
+val joinRefs = 
condition.map(_.references.toSeq.map(_.exprId)).getOrElse(Nil)
+val resultCols = joinedCols ++ joined.output.filterNot(e => 
joinRefs.contains(e.exprId))
 withPlan {
   Project(
-joined.output.filterNot(joinedCols.contains(_)),
+resultCols,
 Join(
   joined.left,
   joined.right,

http://git-wip-us.apache.org/repos/asf/spark/blob/1ebedb20/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 

spark git commit: Revert "Once driver register successfully, stop it to connect to master."

2015-12-17 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 da7542f24 -> a8466489a


Revert "Once driver register successfully, stop it to connect to master."

This reverts commit da7542f2408140a9a3b7ea245350976ac18676a5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8466489
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8466489
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8466489

Branch: refs/heads/branch-1.6
Commit: a8466489ab01e59fe07ba20adfc3983ec6928157
Parents: da7542f
Author: Davies Liu 
Authored: Thu Dec 17 08:01:59 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 17 08:01:59 2015 -0800

--
 core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8466489/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 3cf7464..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -130,7 +130,6 @@ private[spark] class AppClient(
 if (registered.get) {
   registerMasterFutures.get.foreach(_.cancel(true))
   registerMasterThreadPool.shutdownNow()
-  registrationRetryTimer.cancel(true)
 } else if (nthRetry >= REGISTRATION_RETRIES) {
   markDead("All masters are unresponsive! Giving up.")
 } else {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Once driver register successfully, stop it to connect to master.

2015-12-17 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d509194b8 -> da7542f24


Once driver register successfully, stop it to connect to master.

This commit is to resolve SPARK-12396.

Author: echo2mei <534384...@qq.com>

Closes #10354 from echoTomei/master.

(cherry picked from commit 5a514b61bbfb609c505d8d65f2483068a56f1f70)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da7542f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da7542f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da7542f2

Branch: refs/heads/branch-1.6
Commit: da7542f2408140a9a3b7ea245350976ac18676a5
Parents: d509194
Author: echo2mei <534384...@qq.com>
Authored: Thu Dec 17 07:59:17 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 17 07:59:27 2015 -0800

--
 core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/da7542f2/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 1e2f469..3cf7464 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -130,6 +130,7 @@ private[spark] class AppClient(
 if (registered.get) {
   registerMasterFutures.get.foreach(_.cancel(true))
   registerMasterThreadPool.shutdownNow()
+  registrationRetryTimer.cancel(true)
 } else if (nthRetry >= REGISTRATION_RETRIES) {
   markDead("All masters are unresponsive! Giving up.")
 } else {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Once driver register successfully, stop it to connect to master.

2015-12-17 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 9d66c4216 -> 5a514b61b


Once driver register successfully, stop it to connect to master.

This commit is to resolve SPARK-12396.

Author: echo2mei <534384...@qq.com>

Closes #10354 from echoTomei/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a514b61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a514b61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a514b61

Branch: refs/heads/master
Commit: 5a514b61bbfb609c505d8d65f2483068a56f1f70
Parents: 9d66c42
Author: echo2mei <534384...@qq.com>
Authored: Thu Dec 17 07:59:17 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 17 07:59:17 2015 -0800

--
 core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5a514b61/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 1e2f469..3cf7464 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -130,6 +130,7 @@ private[spark] class AppClient(
 if (registered.get) {
   registerMasterFutures.get.foreach(_.cancel(true))
   registerMasterThreadPool.shutdownNow()
+  registrationRetryTimer.cancel(true)
 } else if (nthRetry >= REGISTRATION_RETRIES) {
   markDead("All masters are unresponsive! Giving up.")
 } else {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server

2015-12-17 Thread sarutak
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1fbca4120 -> 881f2544e


[SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server

Fix problem with #10332, this one should fix Cluster mode on Mesos

Author: Iulian Dragos 

Closes #10359 from dragos/issue/fix-spark-12345-one-more-time.

(cherry picked from commit 8184568810e8a2e7d5371db2c6a0366ef4841f70)
Signed-off-by: Kousuke Saruta 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/881f2544
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/881f2544
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/881f2544

Branch: refs/heads/branch-1.6
Commit: 881f2544e13679c185a7c34ddb82e885aaa79813
Parents: 1fbca41
Author: Iulian Dragos 
Authored: Fri Dec 18 03:19:31 2015 +0900
Committer: Kousuke Saruta 
Committed: Fri Dec 18 03:37:43 2015 +0900

--
 .../scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/881f2544/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 7c01ae4..196338f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -99,7 +99,7 @@ private[mesos] class MesosSubmitRequestServlet(
 // look for files in SPARK_HOME instead. We only need the ability to 
specify where to find
 // spark-submit script which user can user spark.executor.home or 
spark.home configurations
 // (SPARK-12345).
-val environmentVariables = 
request.environmentVariables.filter(!_.equals("SPARK_HOME"))
+val environmentVariables = 
request.environmentVariables.filterKeys(!_.equals("SPARK_HOME"))
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 6e0771665 -> 86e405f35


[SPARK-12220][CORE] Make Utils.fetchFile support files that contain special 
characters

This PR encodes and decodes the file name to fix the issue.

Author: Shixiong Zhu 

Closes #10208 from zsxwing/uri.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86e405f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86e405f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86e405f3

Branch: refs/heads/master
Commit: 86e405f357711ae93935853a912bc13985c259db
Parents: 6e07716
Author: Shixiong Zhu 
Authored: Thu Dec 17 09:55:37 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 09:55:37 2015 -0800

--
 .../scala/org/apache/spark/HttpFileServer.scala |  6 ++---
 .../spark/rpc/netty/NettyStreamManager.scala|  5 ++--
 .../scala/org/apache/spark/util/Utils.scala | 26 +++-
 .../org/apache/spark/rpc/RpcEnvSuite.scala  |  4 +++
 .../org/apache/spark/util/UtilsSuite.scala  | 11 +
 5 files changed, 46 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/86e405f3/core/src/main/scala/org/apache/spark/HttpFileServer.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala 
b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 77d8ec9..46f9f9e 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -63,12 +63,12 @@ private[spark] class HttpFileServer(
 
   def addFile(file: File) : String = {
 addFileToDir(file, fileDir)
-serverUri + "/files/" + file.getName
+serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
   def addJar(file: File) : String = {
 addFileToDir(file, jarDir)
-serverUri + "/jars/" + file.getName
+serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
   def addDirectory(path: String, resourceBase: String): String = {
@@ -85,7 +85,7 @@ private[spark] class HttpFileServer(
   throw new IllegalArgumentException(s"$file cannot be a directory.")
 }
 Files.copy(file, new File(dir, file.getName))
-dir + "/" + file.getName
+dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/86e405f3/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index ecd9697..394cde4 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.server.StreamManager
 import org.apache.spark.rpc.RpcEnvFileServer
+import org.apache.spark.util.Utils
 
 /**
  * StreamManager implementation for serving files from a NettyRpcEnv.
@@ -64,13 +65,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   override def addFile(file: File): String = {
 require(files.putIfAbsent(file.getName(), file) == null,
   s"File ${file.getName()} already registered.")
-s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
+
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
   override def addJar(file: File): String = {
 require(jars.putIfAbsent(file.getName(), file) == null,
   s"JAR ${file.getName()} already registered.")
-s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
+
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
   override def addDirectory(baseUri: String, path: File): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/86e405f3/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9dbe66e..fce89df 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -331,6 +331,30 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * A file name may contain some invalid URI characters, such as " ". This 
method will convert 

spark git commit: [SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 41ad8aced -> 1fbca4120


[SPARK-12220][CORE] Make Utils.fetchFile support files that contain special 
characters

This PR encodes and decodes the file name to fix the issue.

Author: Shixiong Zhu 

Closes #10208 from zsxwing/uri.

(cherry picked from commit 86e405f357711ae93935853a912bc13985c259db)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fbca412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fbca412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fbca412

Branch: refs/heads/branch-1.6
Commit: 1fbca41200d6e73cb276d5949b894881c700323f
Parents: 41ad8ac
Author: Shixiong Zhu 
Authored: Thu Dec 17 09:55:37 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 09:55:46 2015 -0800

--
 .../scala/org/apache/spark/HttpFileServer.scala |  6 ++---
 .../spark/rpc/netty/NettyStreamManager.scala|  5 ++--
 .../scala/org/apache/spark/util/Utils.scala | 26 +++-
 .../org/apache/spark/rpc/RpcEnvSuite.scala  |  4 +++
 .../org/apache/spark/util/UtilsSuite.scala  | 11 +
 5 files changed, 46 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/HttpFileServer.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala 
b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 7cf7bc0..ee9bfea 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -63,12 +63,12 @@ private[spark] class HttpFileServer(
 
   def addFile(file: File) : String = {
 addFileToDir(file, fileDir)
-serverUri + "/files/" + file.getName
+serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
   def addJar(file: File) : String = {
 addFileToDir(file, jarDir)
-serverUri + "/jars/" + file.getName
+serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
   def addFileToDir(file: File, dir: File) : String = {
@@ -80,7 +80,7 @@ private[spark] class HttpFileServer(
   throw new IllegalArgumentException(s"$file cannot be a directory.")
 }
 Files.copy(file, new File(dir, file.getName))
-dir + "/" + file.getName
+dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index a2768b4..5343482 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.server.StreamManager
 import org.apache.spark.rpc.RpcEnvFileServer
+import org.apache.spark.util.Utils
 
 /**
  * StreamManager implementation for serving files from a NettyRpcEnv.
@@ -51,13 +52,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   override def addFile(file: File): String = {
 require(files.putIfAbsent(file.getName(), file) == null,
   s"File ${file.getName()} already registered.")
-s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
+
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
   override def addJar(file: File): String = {
 require(jars.putIfAbsent(file.getName(), file) == null,
   s"JAR ${file.getName()} already registered.")
-s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
+
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index af63234..6cb52fb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -318,6 +318,30 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * A file name may contain some invalid URI 

spark git commit: [SQL] Update SQLContext.read.text doc

2015-12-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a170d34a1 -> 6e0771665


[SQL] Update SQLContext.read.text doc

Since we rename the column name from ```text``` to ```value``` for DataFrame 
load by ```SQLContext.read.text```, we need to update doc.

Author: Yanbo Liang 

Closes #10349 from yanboliang/text-value.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e077166
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e077166
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e077166

Branch: refs/heads/master
Commit: 6e0771665b3c9330fc0a5b2c7740a796b4cd712e
Parents: a170d34
Author: Yanbo Liang 
Authored: Thu Dec 17 09:19:46 2015 -0800
Committer: Reynold Xin 
Committed: Thu Dec 17 09:19:46 2015 -0800

--
 python/pyspark/sql/readwriter.py   | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +-
 .../spark/sql/execution/datasources/text/DefaultSource.scala   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e077166/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 2e75f0c..a3d7eca 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -207,7 +207,7 @@ class DataFrameReader(object):
 @ignore_unicode_prefix
 @since(1.6)
 def text(self, paths):
-"""Loads a text file and returns a [[DataFrame]] with a single string 
column named "text".
+"""Loads a text file and returns a [[DataFrame]] with a single string 
column named "value".
 
 Each line in the text file is a new row in the resulting DataFrame.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e077166/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 3ed1e55..c1a8f19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -339,7 +339,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
extends Logging {
   }
 
   /**
-   * Loads a text file and returns a [[DataFrame]] with a single string column 
named "text".
+   * Loads a text file and returns a [[DataFrame]] with a single string column 
named "value".
* Each line in the text file is a new row in the resulting DataFrame. For 
example:
* {{{
*   // Scala:

http://git-wip-us.apache.org/repos/asf/spark/blob/6e077166/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index fbd387b..4a1cbe4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -76,7 +76,7 @@ private[sql] class TextRelation(
 (@transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
-  /** Data schema is always a single column, named "text". */
+  /** Data schema is always a single column, named "value". */
   override def dataSchema: StructType = new StructType().add("value", 
StringType)
 
   /** This is an internal data source that outputs internal row format. */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] Update SQLContext.read.text doc

2015-12-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1ebedb20f -> 41ad8aced


[SQL] Update SQLContext.read.text doc

Since we rename the column name from ```text``` to ```value``` for DataFrame 
load by ```SQLContext.read.text```, we need to update doc.

Author: Yanbo Liang 

Closes #10349 from yanboliang/text-value.

(cherry picked from commit 6e0771665b3c9330fc0a5b2c7740a796b4cd712e)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41ad8ace
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41ad8ace
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41ad8ace

Branch: refs/heads/branch-1.6
Commit: 41ad8aced2fc6c694c15e9465cfa34517b2395e8
Parents: 1ebedb2
Author: Yanbo Liang 
Authored: Thu Dec 17 09:19:46 2015 -0800
Committer: Reynold Xin 
Committed: Thu Dec 17 09:20:04 2015 -0800

--
 python/pyspark/sql/readwriter.py   | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +-
 .../spark/sql/execution/datasources/text/DefaultSource.scala   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/41ad8ace/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 2e75f0c..a3d7eca 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -207,7 +207,7 @@ class DataFrameReader(object):
 @ignore_unicode_prefix
 @since(1.6)
 def text(self, paths):
-"""Loads a text file and returns a [[DataFrame]] with a single string 
column named "text".
+"""Loads a text file and returns a [[DataFrame]] with a single string 
column named "value".
 
 Each line in the text file is a new row in the resulting DataFrame.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/41ad8ace/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 3ed1e55..c1a8f19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -339,7 +339,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
extends Logging {
   }
 
   /**
-   * Loads a text file and returns a [[DataFrame]] with a single string column 
named "text".
+   * Loads a text file and returns a [[DataFrame]] with a single string column 
named "value".
* Each line in the text file is a new row in the resulting DataFrame. For 
example:
* {{{
*   // Scala:

http://git-wip-us.apache.org/repos/asf/spark/blob/41ad8ace/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index fbd387b..4a1cbe4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -76,7 +76,7 @@ private[sql] class TextRelation(
 (@transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
-  /** Data schema is always a single column, named "text". */
+  /** Data schema is always a single column, named "value". */
   override def dataSchema: StructType = new StructType().add("value", 
StringType)
 
   /** This is an internal data source that outputs internal row format. */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server

2015-12-17 Thread sarutak
Repository: spark
Updated Branches:
  refs/heads/master 86e405f35 -> 818456881


[SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server

Fix problem with #10332, this one should fix Cluster mode on Mesos

Author: Iulian Dragos 

Closes #10359 from dragos/issue/fix-spark-12345-one-more-time.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81845688
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81845688
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81845688

Branch: refs/heads/master
Commit: 8184568810e8a2e7d5371db2c6a0366ef4841f70
Parents: 86e405f
Author: Iulian Dragos 
Authored: Fri Dec 18 03:19:31 2015 +0900
Committer: Kousuke Saruta 
Committed: Fri Dec 18 03:19:31 2015 +0900

--
 .../scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81845688/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 24510db..c0b9359 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -99,7 +99,7 @@ private[mesos] class MesosSubmitRequestServlet(
 // cause spark-submit script to look for files in SPARK_HOME instead.
 // We only need the ability to specify where to find spark-submit script
 // which user can user spark.executor.home or spark.home configurations.
-val environmentVariables = 
request.environmentVariables.filter(!_.equals("SPARK_HOME"))
+val environmentVariables = 
request.environmentVariables.filterKeys(!_.equals("SPARK_HOME"))
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager

2015-12-17 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 881f2544e -> 88bbb5429


[SPARK-12390] Clean up unused serializer parameter in BlockManager

No change in functionality is intended. This only changes internal API.

Author: Andrew Or 

Closes #10343 from andrewor14/clean-bm-serializer.

Conflicts:
core/src/main/scala/org/apache/spark/storage/BlockManager.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbb542
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbb542
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbb542

Branch: refs/heads/branch-1.6
Commit: 88bbb5429dd3efcff6b2835a70143247b08ae6b2
Parents: 881f254
Author: Andrew Or 
Authored: Wed Dec 16 20:01:47 2015 -0800
Committer: Andrew Or 
Committed: Thu Dec 17 12:01:13 2015 -0800

--
 .../org/apache/spark/storage/BlockManager.scala | 29 
 .../org/apache/spark/storage/DiskStore.scala| 10 ---
 2 files changed, 11 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ab0007f..2cc2fd9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
   def dataSerializeStream(
   blockId: BlockId,
   outputStream: OutputStream,
-  values: Iterator[Any],
-  serializer: Serializer = defaultSerializer): Unit = {
+  values: Iterator[Any]): Unit = {
 val byteStream = new BufferedOutputStream(outputStream)
-val ser = serializer.newInstance()
+val ser = defaultSerializer.newInstance()
 ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
   }
 
   /** Serializes into a byte buffer. */
-  def dataSerialize(
-  blockId: BlockId,
-  values: Iterator[Any],
-  serializer: Serializer = defaultSerializer): ByteBuffer = {
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
 val byteStream = new ByteArrayOutputStream(4096)
-dataSerializeStream(blockId, byteStream, values, serializer)
+dataSerializeStream(blockId, byteStream, values)
 ByteBuffer.wrap(byteStream.toByteArray)
   }
 
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it 
when the end of
* the iterator is reached.
*/
-  def dataDeserialize(
-  blockId: BlockId,
-  bytes: ByteBuffer,
-  serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
 bytes.rewind()
-dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), 
serializer)
+dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
   }
 
   /**
* Deserializes a InputStream into an iterator of values and disposes of it 
when the end of
* the iterator is reached.
*/
-  def dataDeserializeStream(
-  blockId: BlockId,
-  inputStream: InputStream,
-  serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): 
Iterator[Any] = {
 val stream = new BufferedInputStream(inputStream)
-serializer.newInstance().deserializeStream(wrapForCompression(blockId, 
stream)).asIterator
+defaultSerializer
+  .newInstance()
+  .deserializeStream(wrapForCompression(blockId, stream))
+  .asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9d..6c44771 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
 getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, 
buffer))
   }
 
-  /**
-   * A version of getValues that allows a custom serializer. This is used as 
part of the
-   * shuffle short-circuit code.
-   */
-  def getValues(blockId: BlockId, serializer: Serializer): 

spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 43f02e41e -> 28adc45d5


[SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

String.split accepts a regular expression, so we should escape "." and "|".

Author: Shixiong Zhu 

Closes #10361 from zsxwing/reg-bug.

(cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28adc45d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28adc45d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28adc45d

Branch: refs/heads/branch-1.4
Commit: 28adc45d5dfe2f979ed9e9a63cff108fcc885bce
Parents: 43f02e4
Author: Shixiong Zhu 
Authored: Thu Dec 17 13:23:48 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 13:24:19 2015 -0800

--
 .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +-
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/28adc45d/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 25f2111..446e3eb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -49,7 +49,7 @@ object MovieLensALS {
 def parseMovie(str: String): Movie = {
   val fields = str.split("::")
   assert(fields.size == 3)
-  Movie(fields(0).toInt, fields(1), fields(2).split("|"))
+  Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/28adc45d/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index fe6328b..68f8ea8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -231,7 +231,7 @@ private[streaming] object FileBasedWriteAheadLog {
 
   def getCallerName(): Option[String] = {
 val stackTraceClasses = 
Thread.currentThread.getStackTrace().map(_.getClassName)
-
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption)
   }
 
   /** Convert a sequence of files to a sequence of sorted LogInfo objects */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0fdf5542b -> a8d14cc06


[SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

String.split accepts a regular expression, so we should escape "." and "|".

Author: Shixiong Zhu 

Closes #10361 from zsxwing/reg-bug.

(cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8d14cc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8d14cc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8d14cc0

Branch: refs/heads/branch-1.5
Commit: a8d14cc0623232cc136102b0161e19fcf69a7323
Parents: 0fdf554
Author: Shixiong Zhu 
Authored: Thu Dec 17 13:23:48 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 13:24:08 2015 -0800

--
 .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +-
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8d14cc0/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 3ae53e5..02ed746 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -50,7 +50,7 @@ object MovieLensALS {
 def parseMovie(str: String): Movie = {
   val fields = str.split("::")
   assert(fields.size == 3)
-  Movie(fields(0).toInt, fields(1), fields(2).split("|"))
+  Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a8d14cc0/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index fe6328b..68f8ea8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -231,7 +231,7 @@ private[streaming] object FileBasedWriteAheadLog {
 
   def getCallerName(): Option[String] = {
 val stackTraceClasses = 
Thread.currentThread.getStackTrace().map(_.getClassName)
-
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption)
   }
 
   /** Convert a sequence of files to a sequence of sorted LogInfo objects */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master e096a652b -> ed6ebda5c


[SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in 
assertOrderInvariantEquals method

org.apache.spark.streaming.Java8APISuite.java is failing due to trying to sort 
immutable list in assertOrderInvariantEquals method.

Author: Evan Chen 

Closes #10336 from evanyc15/SPARK-12376-StreamingJavaAPISuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed6ebda5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed6ebda5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed6ebda5

Branch: refs/heads/master
Commit: ed6ebda5c898bad76194fe3a090bef5a14f861c2
Parents: e096a65
Author: Evan Chen 
Authored: Thu Dec 17 14:22:30 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 14:22:30 2015 -0800

--
 .../java/org/apache/spark/streaming/Java8APISuite.java   | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ed6ebda5/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
--
diff --git 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 89e0c7f..e8a0dfc 100644
--- 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -439,9 +439,14 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
*/
   public static > void assertOrderInvariantEquals(
 List expected, List actual) {
-expected.forEach((List list) -> Collections.sort(list));
-actual.forEach((List list) -> Collections.sort(list));
-Assert.assertEquals(expected, actual);
+expected.forEach(list -> Collections.sort(list));
+List sortedActual = new ArrayList<>();
+actual.forEach(list -> {
+List sortedList = new ArrayList<>(list);
+Collections.sort(sortedList);
+sortedActual.add(sortedList);
+});
+Assert.assertEquals(expected, sortedActual);
   }
 
   @Test


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 48dcee484 -> 4df1dd403


[SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in 
assertOrderInvariantEquals method

org.apache.spark.streaming.Java8APISuite.java is failing due to trying to sort 
immutable list in assertOrderInvariantEquals method.

Author: Evan Chen 

Closes #10336 from evanyc15/SPARK-12376-StreamingJavaAPISuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4df1dd40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4df1dd40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4df1dd40

Branch: refs/heads/branch-1.6
Commit: 4df1dd403441a4e4ca056d294385d8d0d8a0c65d
Parents: 48dcee4
Author: Evan Chen 
Authored: Thu Dec 17 14:22:30 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 14:23:45 2015 -0800

--
 .../java/org/apache/spark/streaming/Java8APISuite.java   | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4df1dd40/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
--
diff --git 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 89e0c7f..e8a0dfc 100644
--- 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -439,9 +439,14 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
*/
   public static > void assertOrderInvariantEquals(
 List expected, List actual) {
-expected.forEach((List list) -> Collections.sort(list));
-actual.forEach((List list) -> Collections.sort(list));
-Assert.assertEquals(expected, actual);
+expected.forEach(list -> Collections.sort(list));
+List sortedActual = new ArrayList<>();
+actual.forEach(list -> {
+List sortedList = new ArrayList<>(list);
+Collections.sort(sortedList);
+sortedActual.add(sortedList);
+});
+Assert.assertEquals(expected, sortedActual);
   }
 
   @Test


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 818456881 -> 540b5aead


[SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

String.split accepts a regular expression, so we should escape "." and "|".

Author: Shixiong Zhu 

Closes #10361 from zsxwing/reg-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540b5aea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540b5aea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540b5aea

Branch: refs/heads/master
Commit: 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9
Parents: 8184568
Author: Shixiong Zhu 
Authored: Thu Dec 17 13:23:48 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 13:23:48 2015 -0800

--
 .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +-
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/540b5aea/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 3ae53e5..02ed746 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -50,7 +50,7 @@ object MovieLensALS {
 def parseMovie(str: String): Movie = {
   val fields = str.split("::")
   assert(fields.size == 3)
-  Movie(fields(0).toInt, fields(1), fields(2).split("|"))
+  Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/540b5aea/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index a99b570..b946e0d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -253,7 +253,7 @@ private[streaming] object FileBasedWriteAheadLog {
 
   def getCallerName(): Option[String] = {
 val stackTraceClasses = 
Thread.currentThread.getStackTrace().map(_.getClassName)
-
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption)
   }
 
   /** Convert a sequence of files to a sequence of sorted LogInfo objects */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 88bbb5429 -> c0ab14fbe


[SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

String.split accepts a regular expression, so we should escape "." and "|".

Author: Shixiong Zhu 

Closes #10361 from zsxwing/reg-bug.

(cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ab14fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ab14fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ab14fb

Branch: refs/heads/branch-1.6
Commit: c0ab14fbeab2a81d174c3643a4fcc915ff2902e8
Parents: 88bbb54
Author: Shixiong Zhu 
Authored: Thu Dec 17 13:23:48 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 13:23:58 2015 -0800

--
 .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +-
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c0ab14fb/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 3ae53e5..02ed746 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -50,7 +50,7 @@ object MovieLensALS {
 def parseMovie(str: String): Movie = {
   val fields = str.split("::")
   assert(fields.size == 3)
-  Movie(fields(0).toInt, fields(1), fields(2).split("|"))
+  Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ab14fb/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index a99b570..b946e0d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -253,7 +253,7 @@ private[streaming] object FileBasedWriteAheadLog {
 
   def getCallerName(): Option[String] = {
 val stackTraceClasses = 
Thread.currentThread.getStackTrace().map(_.getClassName)
-
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption)
   }
 
   /** Convert a sequence of files to a sequence of sorted LogInfo objects */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12397][SQL] Improve error messages for data sources when they are not found

2015-12-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 540b5aead -> e096a652b


[SPARK-12397][SQL] Improve error messages for data sources when they are not 
found

Point users to spark-packages.org to find them.

Author: Reynold Xin 

Closes #10351 from rxin/SPARK-12397.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e096a652
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e096a652
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e096a652

Branch: refs/heads/master
Commit: e096a652b92fc64a7b3457cd0766ab324bcc980b
Parents: 540b5ae
Author: Reynold Xin 
Authored: Thu Dec 17 14:16:49 2015 -0800
Committer: Michael Armbrust 
Committed: Thu Dec 17 14:16:49 2015 -0800

--
 .../datasources/ResolvedDataSource.scala| 50 +---
 .../sql/sources/ResolvedDataSourceSuite.scala   | 17 +++
 2 files changed, 49 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e096a652/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 86a306b..e02ee6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -57,24 +57,38 @@ object ResolvedDataSource extends Logging {
 val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
 
 
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList 
match {
-  /** the provider format did not match any given registered aliases */
-  case Nil => 
Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
-case Success(dataSource) => dataSource
-case Failure(error) =>
-  if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-throw new ClassNotFoundException(
-  "The ORC data source must be used with Hive support enabled.", 
error)
-  } else {
-throw new ClassNotFoundException(
-  s"Failed to load class for data source: $provider.", error)
-  }
-  }
-  /** there is exactly one registered alias */
-  case head :: Nil => head.getClass
-  /** There are multiple registered aliases for the input */
-  case sources => sys.error(s"Multiple sources found for $provider, " +
-s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-"please specify the fully qualified class name.")
+  // the provider format did not match any given registered aliases
+  case Nil =>
+
Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+  case Success(dataSource) =>
+// Found the data source using fully qualified path
+dataSource
+  case Failure(error) =>
+if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+  throw new ClassNotFoundException(
+"The ORC data source must be used with Hive support enabled.", 
error)
+} else {
+  if (provider == "avro" || provider == 
"com.databricks.spark.avro") {
+throw new ClassNotFoundException(
+  s"Failed to find data source: $provider. Please use Spark 
package " +
+  "http://spark-packages.org/package/databricks/spark-avro;,
+  error)
+  } else {
+throw new ClassNotFoundException(
+  s"Failed to find data source: $provider. Please find 
packages at " +
+  "http://spark-packages.org;,
+  error)
+  }
+}
+}
+  case head :: Nil =>
+// there is exactly one registered alias
+head.getClass
+  case sources =>
+// There are multiple registered aliases for the input
+sys.error(s"Multiple sources found for $provider " +
+  s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+  "please specify the fully qualified class name.")
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e096a652/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
 

spark git commit: [SPARK-12397][SQL] Improve error messages for data sources when they are not found

2015-12-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c0ab14fbe -> 48dcee484


[SPARK-12397][SQL] Improve error messages for data sources when they are not 
found

Point users to spark-packages.org to find them.

Author: Reynold Xin 

Closes #10351 from rxin/SPARK-12397.

(cherry picked from commit e096a652b92fc64a7b3457cd0766ab324bcc980b)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48dcee48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48dcee48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48dcee48

Branch: refs/heads/branch-1.6
Commit: 48dcee48416d87bf9572ace0a82285bacfcbf46e
Parents: c0ab14f
Author: Reynold Xin 
Authored: Thu Dec 17 14:16:49 2015 -0800
Committer: Michael Armbrust 
Committed: Thu Dec 17 14:16:58 2015 -0800

--
 .../datasources/ResolvedDataSource.scala| 50 +---
 .../sql/sources/ResolvedDataSourceSuite.scala   | 17 +++
 2 files changed, 49 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/48dcee48/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 86a306b..e02ee6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -57,24 +57,38 @@ object ResolvedDataSource extends Logging {
 val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
 
 
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList 
match {
-  /** the provider format did not match any given registered aliases */
-  case Nil => 
Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
-case Success(dataSource) => dataSource
-case Failure(error) =>
-  if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-throw new ClassNotFoundException(
-  "The ORC data source must be used with Hive support enabled.", 
error)
-  } else {
-throw new ClassNotFoundException(
-  s"Failed to load class for data source: $provider.", error)
-  }
-  }
-  /** there is exactly one registered alias */
-  case head :: Nil => head.getClass
-  /** There are multiple registered aliases for the input */
-  case sources => sys.error(s"Multiple sources found for $provider, " +
-s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-"please specify the fully qualified class name.")
+  // the provider format did not match any given registered aliases
+  case Nil =>
+
Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+  case Success(dataSource) =>
+// Found the data source using fully qualified path
+dataSource
+  case Failure(error) =>
+if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+  throw new ClassNotFoundException(
+"The ORC data source must be used with Hive support enabled.", 
error)
+} else {
+  if (provider == "avro" || provider == 
"com.databricks.spark.avro") {
+throw new ClassNotFoundException(
+  s"Failed to find data source: $provider. Please use Spark 
package " +
+  "http://spark-packages.org/package/databricks/spark-avro;,
+  error)
+  } else {
+throw new ClassNotFoundException(
+  s"Failed to find data source: $provider. Please find 
packages at " +
+  "http://spark-packages.org;,
+  error)
+  }
+}
+}
+  case head :: Nil =>
+// there is exactly one registered alias
+head.getClass
+  case sources =>
+// There are multiple registered aliases for the input
+sys.error(s"Multiple sources found for $provider " +
+  s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+  "please specify the fully qualified class name.")
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/48dcee48/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
--
diff --git 

[2/2] spark git commit: [SPARK-8641][SQL] Native Spark Window functions

2015-12-17 Thread yhuai
[SPARK-8641][SQL] Native Spark Window functions

This PR removes Hive windows functions from Spark and replaces them with 
(native) Spark ones. The PR is on par with Hive in terms of features.

This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.

cc rxin / yhuai

Author: Herman van Hovell 

Closes #9819 from hvanhovell/SPARK-8641-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/658f66e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/658f66e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/658f66e6

Branch: refs/heads/master
Commit: 658f66e6208a52367e3b43a6fee9c90f33fb6226
Parents: ed6ebda
Author: Herman van Hovell 
Authored: Thu Dec 17 15:16:35 2015 -0800
Committer: Yin Huai 
Committed: Thu Dec 17 15:16:35 2015 -0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  81 ++-
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  39 +-
 .../catalyst/analysis/FunctionRegistry.scala|  12 +-
 .../expressions/aggregate/interfaces.scala  |   7 +-
 .../expressions/windowExpressions.scala | 318 +++--
 .../catalyst/analysis/AnalysisErrorSuite.scala  |  31 +-
 .../org/apache/spark/sql/execution/Window.scala | 649 +++
 .../spark/sql/expressions/WindowSpec.scala  |  54 +-
 .../scala/org/apache/spark/sql/functions.scala  |  16 +-
 .../apache/spark/sql/DataFrameWindowSuite.scala | 295 +
 .../HiveWindowFunctionQuerySuite.scala  |  10 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   1 -
 .../org/apache/spark/sql/hive/HiveQl.scala  |  22 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala| 224 ---
 .../sql/hive/HiveDataFrameWindowSuite.scala | 259 
 .../sql/hive/execution/WindowQuerySuite.scala   | 230 +++
 16 files changed, 1325 insertions(+), 923 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ca00a5e..64dd83a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -77,6 +77,8 @@ class Analyzer(
   ResolveGenerate ::
   ResolveFunctions ::
   ResolveAliases ::
+  ResolveWindowOrder ::
+  ResolveWindowFrame ::
   ExtractWindowExpressions ::
   GlobalAggregates ::
   ResolveAggregateFunctions ::
@@ -127,14 +129,12 @@ class Analyzer(
   // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
   case WithWindowDefinition(windowDefinitions, child) =>
 child.transform {
-  case plan => plan.transformExpressions {
+  case p => p.transformExpressions {
 case UnresolvedWindowExpression(c, 
WindowSpecReference(windowName)) =>
   val errorMessage =
 s"Window specification $windowName is not defined in the 
WINDOW clause."
   val windowSpecDefinition =
-windowDefinitions
-  .get(windowName)
-  .getOrElse(failAnalysis(errorMessage))
+windowDefinitions.getOrElse(windowName, 
failAnalysis(errorMessage))
   WindowExpression(c, windowSpecDefinition)
   }
 }
@@ -577,6 +577,10 @@ class Analyzer(
   AggregateExpression(max, Complete, isDistinct = false)
 case min: Min if isDistinct =>
   AggregateExpression(min, Complete, isDistinct = false)
+// AggregateWindowFunctions are AggregateFunctions that can 
only be evaluated within
+// the context of a Window clause. They do not need to be 
wrapped in an
+// AggregateExpression.
+case wf: AggregateWindowFunction => wf
 // We get an aggregate function, we need to wrap it in an 
AggregateExpression.
 case agg: AggregateFunction => AggregateExpression(agg, 
Complete, isDistinct)
 // This function is not an aggregate function, just return the 
resolved one.
@@ -597,11 +601,17 @@ class Analyzer(
 }
 
 def containsAggregates(exprs: Seq[Expression]): Boolean = {
-  exprs.foreach(_.foreach {
-case agg: AggregateExpression => return true
-case _ =>
-  })
-  false
+  // Collect all Windowed Aggregate Expressions.
+  val windowedAggExprs = 

spark git commit: [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 658f66e62 -> f4346f612


[SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when 
recovering from checkpoint data

Add a transient flag `DStream.restoredFromCheckpointData` to control the 
restore processing in DStream to avoid duplicate works:  check this flag first 
in `DStream.restoreCheckpointData`, only when `false`, the restore process will 
be executed.

Author: jhu-chang 

Closes #9765 from jhu-chang/SPARK-11749.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4346f61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4346f61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4346f61

Branch: refs/heads/master
Commit: f4346f612b6798517153a786f9172cf41618d34d
Parents: 658f66e
Author: jhu-chang 
Authored: Thu Dec 17 17:53:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 17:53:15 2015 -0800

--
 .../spark/streaming/dstream/DStream.scala   | 15 --
 .../spark/streaming/CheckpointSuite.scala   | 56 ++--
 2 files changed, 62 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f4346f61/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1a6edf9..91a43e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] (
   private[streaming] val mustCheckpoint = false
   private[streaming] var checkpointDuration: Duration = null
   private[streaming] val checkpointData = new DStreamCheckpointData(this)
+  @transient
+  private var restoredFromCheckpointData = false
 
   // Reference to whole DStream graph
   private[streaming] var graph: DStreamGraph = null
@@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] (
* override the updateCheckpointData() method would also need to override 
this method.
*/
   private[streaming] def restoreCheckpointData() {
-// Create RDDs from the checkpoint data
-logInfo("Restoring checkpoint data")
-checkpointData.restore()
-dependencies.foreach(_.restoreCheckpointData())
-logInfo("Restored checkpoint data")
+if (!restoredFromCheckpointData) {
+  // Create RDDs from the checkpoint data
+  logInfo("Restoring checkpoint data")
+  checkpointData.restore()
+  dependencies.foreach(_.restoreCheckpointData())
+  restoredFromCheckpointData = true
+  logInfo("Restored checkpoint data")
+}
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/f4346f61/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index cd28d3c..f5f446f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.{ObjectOutputStream, ByteArrayOutputStream, 
ByteArrayInputStream, File}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -34,9 +34,30 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
-import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+
+/**
+ * A input stream that records the times of restore() invoked
+ */
+private[streaming]
+class CheckpointInputDStream(ssc_ : StreamingContext) extends 
InputDStream[Int](ssc_) {
+  protected[streaming] override val checkpointData = new 
FileInputDStreamCheckpointData
+  override def start(): Unit = { }
+  override def stop(): Unit = { }
+  override def compute(time: Time): Option[RDD[Int]] = 
Some(ssc.sc.makeRDD(Seq(1)))
+  private[streaming]

spark git commit: [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4df1dd403 -> 9177ea383


[SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when 
recovering from checkpoint data

Add a transient flag `DStream.restoredFromCheckpointData` to control the 
restore processing in DStream to avoid duplicate works:  check this flag first 
in `DStream.restoreCheckpointData`, only when `false`, the restore process will 
be executed.

Author: jhu-chang 

Closes #9765 from jhu-chang/SPARK-11749.

(cherry picked from commit f4346f612b6798517153a786f9172cf41618d34d)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9177ea38
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9177ea38
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9177ea38

Branch: refs/heads/branch-1.6
Commit: 9177ea383a29653f0591a59e1ee2dff6b87d5a1c
Parents: 4df1dd4
Author: jhu-chang 
Authored: Thu Dec 17 17:53:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 17:54:14 2015 -0800

--
 .../spark/streaming/dstream/DStream.scala   | 15 --
 .../spark/streaming/CheckpointSuite.scala   | 56 ++--
 2 files changed, 62 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9177ea38/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1a6edf9..91a43e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] (
   private[streaming] val mustCheckpoint = false
   private[streaming] var checkpointDuration: Duration = null
   private[streaming] val checkpointData = new DStreamCheckpointData(this)
+  @transient
+  private var restoredFromCheckpointData = false
 
   // Reference to whole DStream graph
   private[streaming] var graph: DStreamGraph = null
@@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] (
* override the updateCheckpointData() method would also need to override 
this method.
*/
   private[streaming] def restoreCheckpointData() {
-// Create RDDs from the checkpoint data
-logInfo("Restoring checkpoint data")
-checkpointData.restore()
-dependencies.foreach(_.restoreCheckpointData())
-logInfo("Restored checkpoint data")
+if (!restoredFromCheckpointData) {
+  // Create RDDs from the checkpoint data
+  logInfo("Restoring checkpoint data")
+  checkpointData.restore()
+  dependencies.foreach(_.restoreCheckpointData())
+  restoredFromCheckpointData = true
+  logInfo("Restored checkpoint data")
+}
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/9177ea38/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index cd28d3c..f5f446f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.{ObjectOutputStream, ByteArrayOutputStream, 
ByteArrayInputStream, File}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -34,9 +34,30 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
-import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+
+/**
+ * A input stream that records the times of restore() invoked
+ */
+private[streaming]
+class CheckpointInputDStream(ssc_ : StreamingContext) extends 
InputDStream[Int](ssc_) {
+  protected[streaming] override val checkpointData = new 
FileInputDStreamCheckpointData
+  override def start(): Unit = { }
+  

spark git commit: [MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite'

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f4346f612 -> 0370abdfd


[MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite'

Hide the error logs for 'SQLListenerMemoryLeakSuite' to avoid noises. Most of 
changes are space changes.

Author: Shixiong Zhu 

Closes #10363 from zsxwing/hide-log.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0370abdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0370abdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0370abdf

Branch: refs/heads/master
Commit: 0370abdfd636566cd8df954c6f9ea5a794d275ef
Parents: f4346f6
Author: Shixiong Zhu 
Authored: Thu Dec 17 18:18:12 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 18:18:12 2015 -0800

--
 .../sql/execution/ui/SQLListenerSuite.scala | 64 +++-
 1 file changed, 35 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0370abdf/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 12a4e13..11a6ce9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -336,39 +336,45 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
 class SQLListenerMemoryLeakSuite extends SparkFunSuite {
 
   test("no memory leak") {
-val conf = new SparkConf()
-  .setMaster("local")
-  .setAppName("test")
-  .set("spark.task.maxFailures", "1") // Don't retry the tasks to run this 
test quickly
-  .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
-val sc = new SparkContext(conf)
+val oldLogLevel = org.apache.log4j.Logger.getRootLogger().getLevel()
 try {
-  SQLContext.clearSqlListener()
-  val sqlContext = new SQLContext(sc)
-  import sqlContext.implicits._
-  // Run 100 successful executions and 100 failed executions.
-  // Each execution only has one job and one stage.
-  for (i <- 0 until 100) {
-val df = Seq(
-  (1, 1),
-  (2, 2)
-).toDF()
-df.collect()
-try {
-  df.foreach(_ => throw new RuntimeException("Oops"))
-} catch {
-  case e: SparkException => // This is expected for a failed job
+  
org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.FATAL)
+  val conf = new SparkConf()
+.setMaster("local")
+.setAppName("test")
+.set("spark.task.maxFailures", "1") // Don't retry the tasks to run 
this test quickly
+.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
+  val sc = new SparkContext(conf)
+  try {
+SQLContext.clearSqlListener()
+val sqlContext = new SQLContext(sc)
+import sqlContext.implicits._
+// Run 100 successful executions and 100 failed executions.
+// Each execution only has one job and one stage.
+for (i <- 0 until 100) {
+  val df = Seq(
+(1, 1),
+(2, 2)
+  ).toDF()
+  df.collect()
+  try {
+df.foreach(_ => throw new RuntimeException("Oops"))
+  } catch {
+case e: SparkException => // This is expected for a failed job
+  }
 }
+sc.listenerBus.waitUntilEmpty(1)
+assert(sqlContext.listener.getCompletedExecutions.size <= 50)
+assert(sqlContext.listener.getFailedExecutions.size <= 50)
+// 50 for successful executions and 50 for failed executions
+assert(sqlContext.listener.executionIdToData.size <= 100)
+assert(sqlContext.listener.jobIdToExecutionId.size <= 100)
+assert(sqlContext.listener.stageIdToStageMetrics.size <= 100)
+  } finally {
+sc.stop()
   }
-  sc.listenerBus.waitUntilEmpty(1)
-  assert(sqlContext.listener.getCompletedExecutions.size <= 50)
-  assert(sqlContext.listener.getFailedExecutions.size <= 50)
-  // 50 for successful executions and 50 for failed executions
-  assert(sqlContext.listener.executionIdToData.size <= 100)
-  assert(sqlContext.listener.jobIdToExecutionId.size <= 100)
-  assert(sqlContext.listener.stageIdToStageMetrics.size <= 100)
 } finally {
-  sc.stop()
+  org.apache.log4j.Logger.getRootLogger().setLevel(oldLogLevel)
 }
   }
 }