spark git commit: [CORE][TESTS] minor fix of JavaSerializerSuite

2015-12-18 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 0370abdfd -> 40e52a27c


[CORE][TESTS] minor fix of JavaSerializerSuite

Not jira is created.
The original test is passed because the class cast is lazy (only when the 
object's method is invoked).

Author: Jeff Zhang 

Closes #10371 from zjffdu/minor_fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40e52a27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40e52a27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40e52a27

Branch: refs/heads/master
Commit: 40e52a27c74259237dd1906c0e8b54d2ae645dfb
Parents: 0370abd
Author: Jeff Zhang 
Authored: Fri Dec 18 00:49:56 2015 -0800
Committer: Reynold Xin 
Committed: Fri Dec 18 00:49:56 2015 -0800

--
 .../org/apache/spark/serializer/JavaSerializerSuite.scala   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40e52a27/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
index 20f4567..6a6ea42 100644
--- a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
@@ -23,13 +23,18 @@ class JavaSerializerSuite extends SparkFunSuite {
   test("JavaSerializer instances are serializable") {
 val serializer = new JavaSerializer(new SparkConf())
 val instance = serializer.newInstance()
-instance.deserialize[JavaSerializer](instance.serialize(serializer))
+val obj = 
instance.deserialize[JavaSerializer](instance.serialize(serializer))
+// enforce class cast
+obj.getClass
   }
 
   test("Deserialize object containing a primitive Class as attribute") {
 val serializer = new JavaSerializer(new SparkConf())
 val instance = serializer.newInstance()
-instance.deserialize[JavaSerializer](instance.serialize(new 
ContainsPrimitiveClass()))
+val obj = instance.deserialize[ContainsPrimitiveClass](instance.serialize(
+  new ContainsPrimitiveClass()))
+// enforce class cast
+obj.getClass
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12413] Fix Mesos ZK persistence

2015-12-18 Thread sarutak
Repository: spark
Updated Branches:
  refs/heads/master 40e52a27c -> 2bebaa39d


[SPARK-12413] Fix Mesos ZK persistence

I believe this fixes SPARK-12413.  I'm currently running an integration test to 
verify.

Author: Michael Gummelt 

Closes #10366 from mgummelt/fix-zk-mesos.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bebaa39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bebaa39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bebaa39

Branch: refs/heads/master
Commit: 2bebaa39d9da33bc93ef682959cd42c1968a6a3e
Parents: 40e52a2
Author: Michael Gummelt 
Authored: Fri Dec 18 20:18:00 2015 +0900
Committer: Kousuke Saruta 
Committed: Fri Dec 18 20:18:00 2015 +0900

--
 .../org/apache/spark/deploy/rest/mesos/MesosRestServer.scala   | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bebaa39/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index c0b9359..87d0fa8 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -99,7 +99,11 @@ private[mesos] class MesosSubmitRequestServlet(
 // cause spark-submit script to look for files in SPARK_HOME instead.
 // We only need the ability to specify where to find spark-submit script
 // which user can user spark.executor.home or spark.home configurations.
-val environmentVariables = 
request.environmentVariables.filterKeys(!_.equals("SPARK_HOME"))
+//
+// Do not use `filterKeys` here to avoid SI-6654, which breaks ZK 
persistence
+val environmentVariables = request.environmentVariables.filter { case (k, 
_) =>
+  k != "SPARK_HOME"
+}
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12413] Fix Mesos ZK persistence

2015-12-18 Thread sarutak
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9177ea383 -> df0231952


[SPARK-12413] Fix Mesos ZK persistence

I believe this fixes SPARK-12413.  I'm currently running an integration test to 
verify.

Author: Michael Gummelt 

Closes #10366 from mgummelt/fix-zk-mesos.

(cherry picked from commit 2bebaa39d9da33bc93ef682959cd42c1968a6a3e)
Signed-off-by: Kousuke Saruta 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df023195
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df023195
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df023195

Branch: refs/heads/branch-1.6
Commit: df0231952e5542e9870f8dde9ecbd7ad9a50f847
Parents: 9177ea3
Author: Michael Gummelt 
Authored: Fri Dec 18 20:18:00 2015 +0900
Committer: Kousuke Saruta 
Committed: Fri Dec 18 20:21:42 2015 +0900

--
 .../org/apache/spark/deploy/rest/mesos/MesosRestServer.scala   | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/df023195/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 196338f..ec44b78 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -99,7 +99,11 @@ private[mesos] class MesosSubmitRequestServlet(
 // look for files in SPARK_HOME instead. We only need the ability to 
specify where to find
 // spark-submit script which user can user spark.executor.home or 
spark.home configurations
 // (SPARK-12345).
-val environmentVariables = 
request.environmentVariables.filterKeys(!_.equals("SPARK_HOME"))
+
+// Do not use `filterKeys` here to avoid SI-6654, which breaks ZK 
persistence
+val environmentVariables = request.environmentVariables.filter { case (k, 
_) =>
+  k != "SPARK_HOME"
+}
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10500][SPARKR] sparkr.zip cannot be created if /R/lib is unwritable

2015-12-18 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 a8d14cc06 -> d2f71c27c


[SPARK-10500][SPARKR] sparkr.zip cannot be created if /R/lib is unwritable

Backport https://github.com/apache/spark/pull/9390 and 
https://github.com/apache/spark/pull/9744 to branch-1.5.

Author: Sun Rui 
Author: Shivaram Venkataraman 

Closes #10372 from sun-rui/SPARK-10500-branch-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2f71c27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2f71c27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2f71c27

Branch: refs/heads/branch-1.5
Commit: d2f71c27c0f43475a69791693f1684e106e9b475
Parents: a8d14cc
Author: Sun Rui 
Authored: Fri Dec 18 07:14:37 2015 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Dec 18 07:14:37 2015 -0800

--
 R/install-dev.bat   |  6 +++
 R/install-dev.sh|  4 ++
 R/pkg/R/sparkR.R| 14 ++-
 R/pkg/inst/profile/general.R|  3 +-
 R/pkg/inst/worker/daemon.R  |  5 ++-
 R/pkg/inst/worker/worker.R  |  3 +-
 .../scala/org/apache/spark/api/r/RBackend.scala |  1 +
 .../scala/org/apache/spark/api/r/RRDD.scala |  4 +-
 .../scala/org/apache/spark/api/r/RUtils.scala   | 37 +
 .../org/apache/spark/deploy/RPackageUtils.scala | 26 
 .../scala/org/apache/spark/deploy/RRunner.scala |  5 ++-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 43 
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  9 ++--
 make-distribution.sh|  1 +
 14 files changed, 124 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d2f71c27/R/install-dev.bat
--
diff --git a/R/install-dev.bat b/R/install-dev.bat
index 008a5c6..ed1c91a 100644
--- a/R/install-dev.bat
+++ b/R/install-dev.bat
@@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
 MKDIR %SPARK_HOME%\R\lib
 
 R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib"  %SPARK_HOME%\R\pkg\
+
+rem Zip the SparkR package so that it can be distributed to worker nodes on 
YARN
+pushd %SPARK_HOME%\R\lib
+%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
+popd
+

http://git-wip-us.apache.org/repos/asf/spark/blob/d2f71c27/R/install-dev.sh
--
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 59d98c9..4972bb9 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% 
rownames(installed.packages())) { library(devtoo
 # Install SparkR to $LIB_DIR
 R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
 
+# Zip the SparkR package so that it can be distributed to worker nodes on YARN
+cd $LIB_DIR
+jar cfM "$LIB_DIR/sparkr.zip" SparkR
+
 popd > /dev/null

http://git-wip-us.apache.org/repos/asf/spark/blob/d2f71c27/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 26f941e..154fbb4 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -49,6 +49,12 @@ sparkR.stop <- function() {
   }
 }
 
+# Remove the R package lib path from .libPaths()
+if (exists(".libPath", envir = env)) {
+  libPath <- get(".libPath", envir = env)
+  .libPaths(.libPaths()[.libPaths() != libPath])
+}
+
 if (exists(".backendLaunched", envir = env)) {
   callJStatic("SparkRHandler", "stopBackend")
 }
@@ -149,14 +155,20 @@ sparkR.init <- function(
 f <- file(path, open="rb")
 backendPort <- readInt(f)
 monitorPort <- readInt(f)
+rLibPath <- readString(f)
 close(f)
 file.remove(path)
 if (length(backendPort) == 0 || backendPort == 0 ||
-length(monitorPort) == 0 || monitorPort == 0) {
+length(monitorPort) == 0 || monitorPort == 0 ||
+length(rLibPath) != 1) {
   stop("JVM failed to launch")
 }
 assign(".monitorConn", socketConnection(port = monitorPort), envir = 
.sparkREnv)
 assign(".backendLaunched", 1, envir = .sparkREnv)
+if (rLibPath != "") {
+  assign(".libPath", rLibPath, envir = .sparkREnv)
+  .libPaths(c(rLibPath, .libPaths()))
+}
   }
 
   .sparkREnv$backendPort <- backendPort

http://git-wip-us.apache.org/repos/asf/spark/blob/d2f71c27/R/pkg/inst/profile/general.R
--
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
index 2a8a821..c55fe9b 100644
--- a/R/pkg/inst/profile/general.R
+++ b/R/pkg/inst/profile/general.R
@@ -17,6 +17,7 @@
 
 .First <- function() {
   packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
-  .libPaths(c(p

spark git commit: [SPARK-12350][CORE] Don't log errors when requested stream is not found.

2015-12-18 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master ea59b0f3a -> 278281828


[SPARK-12350][CORE] Don't log errors when requested stream is not found.

If a client requests a non-existent stream, just send a failure message
back, without logging any error on the server side (since it's not a
server error).

On the executor side, avoid error logs by translating any errors during
transfer to a `ClassNotFoundException`, so that loading the class is
retried on a the parent class loader. This can mask IO errors during
transmission, but the most common cause is that the class is not
served by the remote end.

Author: Marcelo Vanzin 

Closes #10337 from vanzin/SPARK-12350.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27828182
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27828182
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27828182

Branch: refs/heads/master
Commit: 2782818287a71925523c1320291db6cb25221e9f
Parents: ea59b0f
Author: Marcelo Vanzin 
Authored: Fri Dec 18 09:49:08 2015 -0800
Committer: Marcelo Vanzin 
Committed: Fri Dec 18 09:49:08 2015 -0800

--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 17 
 .../spark/rpc/netty/NettyStreamManager.scala|  7 +--
 .../spark/network/server/StreamManager.java |  1 +
 .../network/server/TransportRequestHandler.java |  7 ++-
 .../apache/spark/repl/ExecutorClassLoader.scala | 21 ++--
 5 files changed, 39 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index de3db6b..975ea1a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv(
 }
 
 override def read(dst: ByteBuffer): Int = {
-  val result = if (error == null) {
-Try(source.read(dst))
-  } else {
-Failure(error)
-  }
-
-  result match {
+  Try(source.read(dst)) match {
 case Success(bytesRead) => bytesRead
-case Failure(error) => throw error
+case Failure(readErr) =>
+  if (error != null) {
+throw error
+  } else {
+throw readErr
+  }
   }
 }
 
@@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv(
 }
 
 override def onFailure(streamId: String, cause: Throwable): Unit = {
-  logError(s"Error downloading stream $streamId.", cause)
+  logDebug(s"Error downloading stream $streamId.", cause)
   source.setError(cause)
   sink.close()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 394cde4..afcb023 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
 new File(dir, fname)
 }
 
-require(file != null && file.isFile(), s"File not found: $streamId")
-new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+if (file != null && file.isFile()) {
+  new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, 
file.length())
+} else {
+  null
+}
   }
 
   override def addFile(file: File): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
 
b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
index 3f01559..07f161a 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
@@ -54,6 +54,7 @@ public abstract class StreamManager {
* {@link #getChunk(long, int)} method.
*
* @param streamId id of a stream that has been previously registered with 
the StreamManager.
+   * @return A managed buffer for the stream, or null if the stream was not 
found.
*/

spark git commit: [SPARK-11619][SQL] cannot use UDTF in DataFrame.selectExpr

2015-12-18 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 278281828 -> ee444fe4b


[SPARK-11619][SQL] cannot use UDTF in DataFrame.selectExpr

Description of the problem from cloud-fan

Actually this line: 
https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L689
When we use `selectExpr`, we pass in `UnresolvedFunction` to `DataFrame.select` 
and fall in the last case. A workaround is to do special handling for UDTF like 
we did for `explode`(and `json_tuple` in 1.6), wrap it with `MultiAlias`.
Another workaround is using `expr`, for example, 
`df.select(expr("explode(a)").as(Nil))`, I think `selectExpr` is no longer 
needed after we have the `expr` function

Author: Dilip Biswal 

Closes #9981 from dilipbiswal/spark-11619.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee444fe4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee444fe4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee444fe4

Branch: refs/heads/master
Commit: ee444fe4b8c9f382524e1fa346c67ba6da8104d8
Parents: 2782818
Author: Dilip Biswal 
Authored: Fri Dec 18 09:54:30 2015 -0800
Committer: Yin Huai 
Committed: Fri Dec 18 09:54:30 2015 -0800

--
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 12 ++--
 .../apache/spark/sql/catalyst/analysis/unresolved.scala |  6 +-
 .../src/main/scala/org/apache/spark/sql/Column.scala| 12 +++-
 .../src/main/scala/org/apache/spark/sql/Dataset.scala   |  2 +-
 .../scala/org/apache/spark/sql/DataFrameSuite.scala |  7 +++
 .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala |  4 
 .../main/scala/org/apache/spark/sql/hive/HiveQl.scala   |  2 +-
 7 files changed, 31 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee444fe4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 64dd83a..c396546 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -149,12 +149,12 @@ class Analyzer(
   exprs.zipWithIndex.map {
 case (expr, i) =>
   expr transform {
-case u @ UnresolvedAlias(child) => child match {
+case u @ UnresolvedAlias(child, optionalAliasName) => child match {
   case ne: NamedExpression => ne
   case e if !e.resolved => u
   case g: Generator => MultiAlias(g, Nil)
   case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)()
-  case other => Alias(other, s"_c$i")()
+  case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))()
 }
   }
   }.asInstanceOf[Seq[NamedExpression]]
@@ -287,7 +287,7 @@ class Analyzer(
   }
 }
 val newGroupByExprs = groupByExprs.map {
-  case UnresolvedAlias(e) => e
+  case UnresolvedAlias(e, _) => e
   case e => e
 }
 Aggregate(newGroupByExprs, groupByExprs ++ pivotAggregates, child)
@@ -352,19 +352,19 @@ class Analyzer(
 Project(
   projectList.flatMap {
 case s: Star => s.expand(child, resolver)
-case UnresolvedAlias(f @ UnresolvedFunction(_, args, _)) if 
containsStar(args) =>
+case UnresolvedAlias(f @ UnresolvedFunction(_, args, _), _) if 
containsStar(args) =>
   val newChildren = expandStarExpressions(args, child)
   UnresolvedAlias(child = f.copy(children = newChildren)) :: Nil
 case Alias(f @ UnresolvedFunction(_, args, _), name) if 
containsStar(args) =>
   val newChildren = expandStarExpressions(args, child)
   Alias(child = f.copy(children = newChildren), name)() :: Nil
-case UnresolvedAlias(c @ CreateArray(args)) if containsStar(args) 
=>
+case UnresolvedAlias(c @ CreateArray(args), _) if 
containsStar(args) =>
   val expandedArgs = args.flatMap {
 case s: Star => s.expand(child, resolver)
 case o => o :: Nil
   }
   UnresolvedAlias(c.copy(children = expandedArgs)) :: Nil
-case UnresolvedAlias(c @ CreateStruct(args)) if containsStar(args) 
=>
+case UnresolvedAlias(c @ CreateStruct(args), _) if 
containsStar(args) =>
   val expandedArgs = args.flatMap {
 case s: Star => s.expand(child, resolver)
 case o => o :: Nil

http:/

spark git commit: [SPARK-12054] [SQL] Consider nullability of expression in codegen

2015-12-18 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master ee444fe4b -> 4af647c77


[SPARK-12054] [SQL] Consider nullability of expression in codegen

This could simplify the generated code for expressions that is not nullable.

This PR fix lots of bugs about nullability.

Author: Davies Liu 

Closes #10333 from davies/skip_nullable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4af647c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4af647c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4af647c7

Branch: refs/heads/master
Commit: 4af647c77ded6a0d3087ceafb2e30e01d97e7a06
Parents: ee444fe
Author: Davies Liu 
Authored: Fri Dec 18 10:09:17 2015 -0800
Committer: Davies Liu 
Committed: Fri Dec 18 10:09:17 2015 -0800

--
 .../catalyst/expressions/BoundAttribute.scala   | 17 ++--
 .../spark/sql/catalyst/expressions/Cast.scala   | 28 +++---
 .../sql/catalyst/expressions/Expression.scala   | 95 +---
 .../aggregate/CentralMomentAgg.scala|  2 +-
 .../catalyst/expressions/aggregate/Corr.scala   |  3 +-
 .../catalyst/expressions/aggregate/Count.scala  | 19 ++--
 .../catalyst/expressions/aggregate/Sum.scala| 24 +++--
 .../expressions/codegen/CodegenFallback.scala   | 27 --
 .../codegen/GenerateMutableProjection.scala | 65 +-
 .../codegen/GenerateUnsafeProjection.scala  | 21 +++--
 .../expressions/complexTypeExtractors.scala | 19 ++--
 .../expressions/datetimeExpressions.scala   |  4 +
 .../expressions/decimalExpressions.scala|  1 +
 .../catalyst/expressions/jsonExpressions.scala  | 15 ++--
 .../catalyst/expressions/mathExpressions.scala  |  5 ++
 .../spark/sql/catalyst/expressions/misc.scala   |  1 +
 .../expressions/stringExpressions.scala |  1 +
 .../expressions/windowExpressions.scala |  4 +-
 .../catalyst/plans/logical/basicOperators.scala |  9 +-
 .../sql/catalyst/expressions/CastSuite.scala| 10 +--
 .../catalyst/expressions/ComplexTypeSuite.scala |  1 -
 .../scala/org/apache/spark/sql/DataFrame.scala  | 21 +++--
 .../org/apache/spark/sql/execution/Window.scala |  9 +-
 .../apache/spark/sql/execution/commands.scala   |  2 +-
 .../spark/sql/execution/datasources/ddl.scala   |  2 +-
 .../sql/execution/joins/HashOuterJoin.scala | 80 +
 .../execution/joins/SortMergeOuterJoin.scala|  2 +-
 27 files changed, 261 insertions(+), 226 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4af647c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index ff1f28d..7293d5d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -69,10 +69,17 @@ case class BoundReference(ordinal: Int, dataType: DataType, 
nullable: Boolean)
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): 
String = {
 val javaType = ctx.javaType(dataType)
 val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString)
-s"""
-  boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal);
-  $javaType ${ev.value} = ${ev.isNull} ? ${ctx.defaultValue(dataType)} : 
($value);
-"""
+if (nullable) {
+  s"""
+boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal);
+$javaType ${ev.value} = ${ev.isNull} ? ${ctx.defaultValue(dataType)} : 
($value);
+  """
+} else {
+  ev.isNull = "false"
+  s"""
+$javaType ${ev.value} = $value;
+  """
+}
   }
 }
 
@@ -92,7 +99,7 @@ object BindReferences extends Logging {
 sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
   }
 } else {
-  BoundReference(ordinal, a.dataType, a.nullable)
+  BoundReference(ordinal, a.dataType, input(ordinal).nullable)
 }
   }
 }.asInstanceOf[A] // Kind of a hack, but safe.  TODO: Tighten return type 
when possible.

http://git-wip-us.apache.org/repos/asf/spark/blob/4af647c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index cb60d59..b18f49f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b

spark git commit: [SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API

2015-12-18 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 4af647c77 -> 41ee7c57a


[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source 
filter API

JIRA: https://issues.apache.org/jira/browse/SPARK-12218

When creating filters for Parquet/ORC, we should not push nested AND 
expressions partially.

Author: Yin Huai 

Closes #10362 from yhuai/SPARK-12218.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41ee7c57
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41ee7c57
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41ee7c57

Branch: refs/heads/master
Commit: 41ee7c57abd9f52065fd7ffb71a8af229603371d
Parents: 4af647c
Author: Yin Huai 
Authored: Fri Dec 18 10:52:14 2015 -0800
Committer: Yin Huai 
Committed: Fri Dec 18 10:53:13 2015 -0800

--
 .../datasources/parquet/ParquetFilters.scala| 12 ++-
 .../parquet/ParquetFilterSuite.scala| 19 +
 .../apache/spark/sql/hive/orc/OrcFilters.scala  | 22 +---
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 20 ++
 4 files changed, 60 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/41ee7c57/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0771432..883013b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -257,7 +257,17 @@ private[sql] object ParquetFilters {
 makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
   case sources.And(lhs, rhs) =>
-(createFilter(schema, lhs) ++ createFilter(schema, 
rhs)).reduceOption(FilterApi.and)
+// At here, it is not safe to just convert one side if we do not 
understand the
+// other side. Here is an example used to explain the reason.
+// Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+// convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
+// NOT(a = 2), which will generate wrong results.
+// Pushing one side of AND down is only safe to do at the top level.
+// You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
+for {
+  lhsFilter <- createFilter(schema, lhs)
+  rhsFilter <- createFilter(schema, rhs)
+} yield FilterApi.and(lhsFilter, rhsFilter)
 
   case sources.Or(lhs, rhs) =>
 for {

http://git-wip-us.apache.org/repos/asf/spark/blob/41ee7c57/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 6178e37..045425f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -362,4 +362,23 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   }
 }
   }
+
+  test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
+import testImplicits._
+
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+  withTempPath { dir =>
+val path = s"${dir.getCanonicalPath}/table1"
+(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", 
"b").write.parquet(path)
+
+checkAnswer(
+  sqlContext.read.parquet(path).where("not (a = 2) or not(b in 
('1'))"),
+  (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+checkAnswer(
+  sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
+  (1 to 5).map(i => Row(i, (i % 2).toString)))
+  }
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/41ee7c57/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 27193f5..ebfb175 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hi

spark git commit: [SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API

2015-12-18 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 df0231952 -> 1dc71ec77


[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source 
filter API

JIRA: https://issues.apache.org/jira/browse/SPARK-12218

When creating filters for Parquet/ORC, we should not push nested AND 
expressions partially.

Author: Yin Huai 

Closes #10362 from yhuai/SPARK-12218.

(cherry picked from commit 41ee7c57abd9f52065fd7ffb71a8af229603371d)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dc71ec7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dc71ec7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dc71ec7

Branch: refs/heads/branch-1.6
Commit: 1dc71ec777ff7cac5d3d7adb13f2d63ffe8909b6
Parents: df02319
Author: Yin Huai 
Authored: Fri Dec 18 10:52:14 2015 -0800
Committer: Yin Huai 
Committed: Fri Dec 18 10:53:31 2015 -0800

--
 .../datasources/parquet/ParquetFilters.scala| 12 ++-
 .../parquet/ParquetFilterSuite.scala| 19 +
 .../apache/spark/sql/hive/orc/OrcFilters.scala  | 22 +---
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 20 ++
 4 files changed, 60 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0771432..883013b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -257,7 +257,17 @@ private[sql] object ParquetFilters {
 makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
   case sources.And(lhs, rhs) =>
-(createFilter(schema, lhs) ++ createFilter(schema, 
rhs)).reduceOption(FilterApi.and)
+// At here, it is not safe to just convert one side if we do not 
understand the
+// other side. Here is an example used to explain the reason.
+// Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+// convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
+// NOT(a = 2), which will generate wrong results.
+// Pushing one side of AND down is only safe to do at the top level.
+// You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
+for {
+  lhsFilter <- createFilter(schema, lhs)
+  rhsFilter <- createFilter(schema, rhs)
+} yield FilterApi.and(lhsFilter, rhsFilter)
 
   case sources.Or(lhs, rhs) =>
 for {

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index cc5aae0..ed5a352 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -364,4 +364,23 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   }
 }
   }
+
+  test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
+import testImplicits._
+
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+  withTempPath { dir =>
+val path = s"${dir.getCanonicalPath}/table1"
+(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", 
"b").write.parquet(path)
+
+checkAnswer(
+  sqlContext.read.parquet(path).where("not (a = 2) or not(b in 
('1'))"),
+  (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+checkAnswer(
+  sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
+  (1 to 5).map(i => Row(i, (i % 2).toString)))
+  }
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/o

spark git commit: [SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API

2015-12-18 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d2f71c27c -> afffe24c0


[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source 
filter API

JIRA: https://issues.apache.org/jira/browse/SPARK-12218

When creating filters for Parquet/ORC, we should not push nested AND 
expressions partially.

Author: Yin Huai 

Closes #10362 from yhuai/SPARK-12218.

(cherry picked from commit 41ee7c57abd9f52065fd7ffb71a8af229603371d)
Signed-off-by: Yin Huai 

Conflicts:

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afffe24c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afffe24c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afffe24c

Branch: refs/heads/branch-1.5
Commit: afffe24c00f49c199383bd07530266a8124d77f2
Parents: d2f71c2
Author: Yin Huai 
Authored: Fri Dec 18 10:52:14 2015 -0800
Committer: Yin Huai 
Committed: Fri Dec 18 10:55:21 2015 -0800

--
 .../datasources/parquet/ParquetFilters.scala| 12 ++-
 .../parquet/ParquetFilterSuite.scala| 19 +
 .../apache/spark/sql/hive/orc/OrcFilters.scala  | 22 +---
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 20 ++
 4 files changed, 60 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 1f0405f..3f7a409 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -264,7 +264,17 @@ private[sql] object ParquetFilters {
 makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
   case sources.And(lhs, rhs) =>
-(createFilter(schema, lhs) ++ createFilter(schema, 
rhs)).reduceOption(FilterApi.and)
+// At here, it is not safe to just convert one side if we do not 
understand the
+// other side. Here is an example used to explain the reason.
+// Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+// convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
+// NOT(a = 2), which will generate wrong results.
+// Pushing one side of AND down is only safe to do at the top level.
+// You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
+for {
+  lhsFilter <- createFilter(schema, lhs)
+  rhsFilter <- createFilter(schema, rhs)
+} yield FilterApi.and(lhsFilter, rhsFilter)
 
   case sources.Or(lhs, rhs) =>
 for {

http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index f88ddc7..05d305a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -336,4 +336,23 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   }
 }
   }
+
+  test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
+import testImplicits._
+
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+  withTempPath { dir =>
+val path = s"${dir.getCanonicalPath}/table1"
+(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", 
"b").write.parquet(path)
+
+checkAnswer(
+  sqlContext.read.parquet(path).where("not (a = 2) or not(b in 
('1'))"),
+  (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+checkAnswer(
+  sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
+  (1 to 5).map(i => Row(i, (i % 2).toString)))
+  }
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
--
diff --git 
a/sql/

spark git commit: Revert "[SPARK-12365][CORE] Use ShutdownHookManager where Runtime.getRuntime.addShutdownHook() is called"

2015-12-18 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1dc71ec77 -> 3b903e44b


Revert "[SPARK-12365][CORE] Use ShutdownHookManager where 
Runtime.getRuntime.addShutdownHook() is called"

This reverts commit 4af64385b085002d94c54d11bbd144f9f026bbd8.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b903e44
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b903e44
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b903e44

Branch: refs/heads/branch-1.6
Commit: 3b903e44b912cd36ec26e9e95444656eee7b0c46
Parents: 1dc71ec
Author: Andrew Or 
Authored: Fri Dec 18 12:56:03 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 12:56:03 2015 -0800

--
 .../spark/deploy/ExternalShuffleService.scala | 18 --
 .../deploy/mesos/MesosClusterDispatcher.scala | 13 -
 .../apache/spark/util/ShutdownHookManager.scala   |  4 
 scalastyle-config.xml | 12 
 4 files changed, 20 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3b903e44/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 7fc96e4..e8a1e35 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -28,7 +28,7 @@ import org.apache.spark.network.sasl.SaslServerBootstrap
 import org.apache.spark.network.server.{TransportServerBootstrap, 
TransportServer}
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
 import org.apache.spark.network.util.TransportConf
-import org.apache.spark.util.{ShutdownHookManager, Utils}
+import org.apache.spark.util.Utils
 
 /**
  * Provides a server from which Executors can read shuffle files (rather than 
reading directly from
@@ -118,13 +118,19 @@ object ExternalShuffleService extends Logging {
 server = newShuffleService(sparkConf, securityManager)
 server.start()
 
-ShutdownHookManager.addShutdownHook { () =>
-  logInfo("Shutting down shuffle service.")
-  server.stop()
-  barrier.countDown()
-}
+installShutdownHook()
 
 // keep running until the process is terminated
 barrier.await()
   }
+
+  private def installShutdownHook(): Unit = {
+Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service 
shutdown thread") {
+  override def run() {
+logInfo("Shutting down shuffle service.")
+server.stop()
+barrier.countDown()
+  }
+})
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b903e44/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 389eff5..5d4e5b8 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
 import org.apache.spark.deploy.mesos.ui.MesosClusterUI
 import org.apache.spark.deploy.rest.mesos.MesosRestServer
 import org.apache.spark.scheduler.cluster.mesos._
-import org.apache.spark.util.{ShutdownHookManager, SignalLogger}
+import org.apache.spark.util.SignalLogger
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
 /*
@@ -103,11 +103,14 @@ private[mesos] object MesosClusterDispatcher extends 
Logging {
 }
 val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
 dispatcher.start()
-ShutdownHookManager.addShutdownHook { () =>
-  logInfo("Shutdown hook is shutting down dispatcher")
-  dispatcher.stop()
-  dispatcher.awaitShutdown()
+val shutdownHook = new Thread() {
+  override def run() {
+logInfo("Shutdown hook is shutting down dispatcher")
+dispatcher.stop()
+dispatcher.awaitShutdown()
+  }
 }
+Runtime.getRuntime.addShutdownHook(shutdownHook)
 dispatcher.awaitShutdown()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b903e44/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 1a0f3b4..620f226 100644
---

spark git commit: [SPARK-12404][SQL] Ensure objects passed to StaticInvoke is Serializable

2015-12-18 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 41ee7c57a -> 6eba65525


[SPARK-12404][SQL] Ensure objects passed to StaticInvoke is Serializable

Now `StaticInvoke` receives `Any` as a object and `StaticInvoke` can be 
serialized but sometimes the object passed is not serializable.

For example, following code raises Exception because `RowEncoder#extractorsFor` 
invoked indirectly makes `StaticInvoke`.

```
case class TimestampContainer(timestamp: java.sql.Timestamp)
val rdd = sc.parallelize(1 to 2).map(_ => 
TimestampContainer(System.currentTimeMillis))
val df = rdd.toDF
val ds = df.as[TimestampContainer]
val rdd2 = ds.rdd <- invokes 
extractorsFor indirectory
```

I'll add test cases.

Author: Kousuke Saruta 
Author: Michael Armbrust 

Closes #10357 from sarutak/SPARK-12404.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6eba6552
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6eba6552
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6eba6552

Branch: refs/heads/master
Commit: 6eba655259d2bcea27d0147b37d5d1e476e85422
Parents: 41ee7c5
Author: Kousuke Saruta 
Authored: Fri Dec 18 14:05:06 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Dec 18 14:05:06 2015 -0800

--
 .../spark/sql/catalyst/JavaTypeInference.scala  | 12 ++---
 .../spark/sql/catalyst/ScalaReflection.scala| 16 +++---
 .../sql/catalyst/encoders/RowEncoder.scala  | 14 +++---
 .../sql/catalyst/expressions/objects.scala  |  8 ++-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 52 
 .../org/apache/spark/sql/DatasetSuite.scala | 12 +
 6 files changed, 88 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6eba6552/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index c8ee87e..f566d1b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -194,7 +194,7 @@ object JavaTypeInference {
 
   case c if c == classOf[java.sql.Date] =>
 StaticInvoke(
-  DateTimeUtils,
+  DateTimeUtils.getClass,
   ObjectType(c),
   "toJavaDate",
   getPath :: Nil,
@@ -202,7 +202,7 @@ object JavaTypeInference {
 
   case c if c == classOf[java.sql.Timestamp] =>
 StaticInvoke(
-  DateTimeUtils,
+  DateTimeUtils.getClass,
   ObjectType(c),
   "toJavaTimestamp",
   getPath :: Nil,
@@ -276,7 +276,7 @@ object JavaTypeInference {
 ObjectType(classOf[Array[Any]]))
 
 StaticInvoke(
-  ArrayBasedMapData,
+  ArrayBasedMapData.getClass,
   ObjectType(classOf[JMap[_, _]]),
   "toJavaMap",
   keyData :: valueData :: Nil)
@@ -341,21 +341,21 @@ object JavaTypeInference {
 
 case c if c == classOf[java.sql.Timestamp] =>
   StaticInvoke(
-DateTimeUtils,
+DateTimeUtils.getClass,
 TimestampType,
 "fromJavaTimestamp",
 inputObject :: Nil)
 
 case c if c == classOf[java.sql.Date] =>
   StaticInvoke(
-DateTimeUtils,
+DateTimeUtils.getClass,
 DateType,
 "fromJavaDate",
 inputObject :: Nil)
 
 case c if c == classOf[java.math.BigDecimal] =>
   StaticInvoke(
-Decimal,
+Decimal.getClass,
 DecimalType.SYSTEM_DEFAULT,
 "apply",
 inputObject :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/6eba6552/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index ecff860..c1b1d5c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -223,7 +223,7 @@ object ScalaReflection extends ScalaReflection {
 
   case t if t <:< localTypeOf[java.sql.Date] =>
 StaticInvoke(
-  DateTimeUtils,
+  DateTimeUtils.getClass,
   ObjectType(classOf[java.sql.Date]),
   "toJavaDate",
   getPath :: Nil,
@@ -231,7 +231,7 

spark git commit: [SPARK-12404][SQL] Ensure objects passed to StaticInvoke is Serializable

2015-12-18 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3b903e44b -> bd33d4ee8


[SPARK-12404][SQL] Ensure objects passed to StaticInvoke is Serializable

Now `StaticInvoke` receives `Any` as a object and `StaticInvoke` can be 
serialized but sometimes the object passed is not serializable.

For example, following code raises Exception because `RowEncoder#extractorsFor` 
invoked indirectly makes `StaticInvoke`.

```
case class TimestampContainer(timestamp: java.sql.Timestamp)
val rdd = sc.parallelize(1 to 2).map(_ => 
TimestampContainer(System.currentTimeMillis))
val df = rdd.toDF
val ds = df.as[TimestampContainer]
val rdd2 = ds.rdd <- invokes 
extractorsFor indirectory
```

I'll add test cases.

Author: Kousuke Saruta 
Author: Michael Armbrust 

Closes #10357 from sarutak/SPARK-12404.

(cherry picked from commit 6eba655259d2bcea27d0147b37d5d1e476e85422)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd33d4ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd33d4ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd33d4ee

Branch: refs/heads/branch-1.6
Commit: bd33d4ee847973289a58032df35375f03e9f9865
Parents: 3b903e4
Author: Kousuke Saruta 
Authored: Fri Dec 18 14:05:06 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Dec 18 14:05:16 2015 -0800

--
 .../spark/sql/catalyst/JavaTypeInference.scala  | 12 ++---
 .../spark/sql/catalyst/ScalaReflection.scala| 16 +++---
 .../sql/catalyst/encoders/RowEncoder.scala  | 14 +++---
 .../sql/catalyst/expressions/objects.scala  |  8 ++-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 52 
 .../org/apache/spark/sql/DatasetSuite.scala | 12 +
 6 files changed, 88 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bd33d4ee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index c8ee87e..f566d1b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -194,7 +194,7 @@ object JavaTypeInference {
 
   case c if c == classOf[java.sql.Date] =>
 StaticInvoke(
-  DateTimeUtils,
+  DateTimeUtils.getClass,
   ObjectType(c),
   "toJavaDate",
   getPath :: Nil,
@@ -202,7 +202,7 @@ object JavaTypeInference {
 
   case c if c == classOf[java.sql.Timestamp] =>
 StaticInvoke(
-  DateTimeUtils,
+  DateTimeUtils.getClass,
   ObjectType(c),
   "toJavaTimestamp",
   getPath :: Nil,
@@ -276,7 +276,7 @@ object JavaTypeInference {
 ObjectType(classOf[Array[Any]]))
 
 StaticInvoke(
-  ArrayBasedMapData,
+  ArrayBasedMapData.getClass,
   ObjectType(classOf[JMap[_, _]]),
   "toJavaMap",
   keyData :: valueData :: Nil)
@@ -341,21 +341,21 @@ object JavaTypeInference {
 
 case c if c == classOf[java.sql.Timestamp] =>
   StaticInvoke(
-DateTimeUtils,
+DateTimeUtils.getClass,
 TimestampType,
 "fromJavaTimestamp",
 inputObject :: Nil)
 
 case c if c == classOf[java.sql.Date] =>
   StaticInvoke(
-DateTimeUtils,
+DateTimeUtils.getClass,
 DateType,
 "fromJavaDate",
 inputObject :: Nil)
 
 case c if c == classOf[java.math.BigDecimal] =>
   StaticInvoke(
-Decimal,
+Decimal.getClass,
 DecimalType.SYSTEM_DEFAULT,
 "apply",
 inputObject :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd33d4ee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 9b6b5b8..ea98956 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -223,7 +223,7 @@ object ScalaReflection extends ScalaReflection {
 
   case t if t <:< localTypeOf[java.sql.Date] =>
 StaticInvoke(
-  DateTimeUtils,
+  DateTimeUtils.getClass,
 

spark git commit: [SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs

2015-12-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 6eba65525 -> 2377b707f


[SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs

 - Provide example on `message handler`
 - Provide bit on KPL record de-aggregation
 - Fix typos

Author: Burak Yavuz 

Closes #9970 from brkyvz/kinesis-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2377b707
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2377b707
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2377b707

Branch: refs/heads/master
Commit: 2377b707f25449f4557bf048bb384c743d9008e5
Parents: 6eba655
Author: Burak Yavuz 
Authored: Fri Dec 18 15:24:41 2015 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 18 15:24:41 2015 -0800

--
 docs/streaming-kinesis-integration.md | 54 +-
 1 file changed, 45 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2377b707/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 238a911..07194b0 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -23,7 +23,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
**Note that by linking to this library, you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
 
-2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream as follows:
+2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream of byte array as follows:
 


@@ -36,7 +36,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
[region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
-   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the Running the Example section for instructions on how to run the 
example.
+   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions on how to run the example.
 


@@ -49,7 +49,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
[region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2);
 
See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
-   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the next subsection for instructions to run the example.
+   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 


@@ -60,18 +60,47 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
[region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
-   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the next subsection for instructions to run the example.
+   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 


 
-- `streamingContext`: StreamingContext containg an application name used 
by Kinesis to tie this Kinesis application to the Kinesis stream
+   You may also provide a "message handler function" that takes a Kinesis 
`Record` and returns a generic object `T`, in case you would like to use other 
data included in a `Record` such as partition key. This is currently only 
supported in Scala and Java.
 
-   - `[Kineiss app name]`: The application name that will be used to 
checkpoint the Kinesis
+   
+   
+
+   

spark git commit: [SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs

2015-12-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 bd33d4ee8 -> eca401ee5


[SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs

 - Provide example on `message handler`
 - Provide bit on KPL record de-aggregation
 - Fix typos

Author: Burak Yavuz 

Closes #9970 from brkyvz/kinesis-docs.

(cherry picked from commit 2377b707f25449f4557bf048bb384c743d9008e5)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eca401ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eca401ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eca401ee

Branch: refs/heads/branch-1.6
Commit: eca401ee5d3ae683cbee531c1f8bc981f9603fc8
Parents: bd33d4e
Author: Burak Yavuz 
Authored: Fri Dec 18 15:24:41 2015 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 18 15:24:49 2015 -0800

--
 docs/streaming-kinesis-integration.md | 54 +-
 1 file changed, 45 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eca401ee/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 238a911..07194b0 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -23,7 +23,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
**Note that by linking to this library, you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
 
-2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream as follows:
+2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream of byte array as follows:
 


@@ -36,7 +36,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
[region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
-   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the Running the Example section for instructions on how to run the 
example.
+   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions on how to run the example.
 


@@ -49,7 +49,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
[region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2);
 
See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
-   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the next subsection for instructions to run the example.
+   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 


@@ -60,18 +60,47 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
[region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
-   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the next subsection for instructions to run the example.
+   and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 


 
-- `streamingContext`: StreamingContext containg an application name used 
by Kinesis to tie this Kinesis application to the Kinesis stream
+   You may also provide a "message handler function" that takes a Kinesis 
`Record` and returns a generic object `T`, in case you would like to use other 
data included in a `Record` such as partition key. This is currently only 
supported in Scala and Java.
 
-   - `[Kineiss

spark git commit: [SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat interval

2015-12-18 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 60da0e11f -> 0514e8d4b


[SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat 
interval

Previously, the rpc timeout was the default network timeout, which is the same 
value
the driver uses to determine dead executors. This means if there is a network 
issue,
the executor is determined dead after one heartbeat attempt. There is a 
separate config
for the heartbeat interval which is a better value to use for the heartbeat 
RPC. With
this change, the executor will make multiple heartbeat attempts even with RPC 
issues.

Author: Nong Li 

Closes #10365 from nongli/spark-12411.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0514e8d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0514e8d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0514e8d4

Branch: refs/heads/master
Commit: 0514e8d4b69615ba8918649e7e3c46b5713b6540
Parents: 60da0e1
Author: Nong Li 
Authored: Fri Dec 18 16:05:18 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 16:05:18 2015 -0800

--
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0514e8d4/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 552b644..9b14184 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
@@ -445,7 +446,8 @@ private[spark] class Executor(
 
 val message = Heartbeat(executorId, tasksMetrics.toArray, 
env.blockManager.blockManagerId)
 try {
-  val response = 
heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
+  val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
+  message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
   if (response.reregisterBlockManager) {
 logInfo("Told to re-register on heartbeat")
 env.blockManager.reregister()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11097][CORE] Add channelActive callback to RpcHandler to monitor the new connections

2015-12-18 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 0514e8d4b -> 007a32f90


[SPARK-11097][CORE] Add channelActive callback to RpcHandler to monitor the new 
connections

Added `channelActive` to `RpcHandler` so that `NettyRpcHandler` doesn't need 
`clients` any more.

Author: Shixiong Zhu 

Closes #10301 from zsxwing/network-events.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/007a32f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/007a32f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/007a32f9

Branch: refs/heads/master
Commit: 007a32f90af1065bfa3ca4cdb194c40c06e87abf
Parents: 0514e8d
Author: Shixiong Zhu 
Authored: Fri Dec 18 16:06:37 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 16:06:37 2015 -0800

--
 .../mesos/MesosExternalShuffleService.scala |   2 +-
 .../apache/spark/rpc/netty/NettyRpcEnv.scala|  17 +--
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 148 +++
 .../spark/rpc/netty/NettyRpcHandlerSuite.scala  |   6 +-
 .../client/TransportResponseHandler.java|   6 +-
 .../spark/network/sasl/SaslRpcHandler.java  |   9 +-
 .../spark/network/server/MessageHandler.java|   7 +-
 .../apache/spark/network/server/RpcHandler.java |   9 +-
 .../network/server/TransportChannelHandler.java |  21 ++-
 .../network/server/TransportRequestHandler.java |   9 +-
 .../spark/network/sasl/SparkSaslSuite.java  |   6 +-
 11 files changed, 148 insertions(+), 92 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/007a32f9/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
index 8ffcfc0..4172d92 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -65,7 +65,7 @@ private[mesos] class 
MesosExternalShuffleBlockHandler(transportConf: TransportCo
   /**
* On connection termination, clean up shuffle files written by the 
associated application.
*/
-  override def connectionTerminated(client: TransportClient): Unit = {
+  override def channelInactive(client: TransportClient): Unit = {
 val address = client.getSocketAddress
 if (connectedApps.contains(address)) {
   val appId = connectedApps(address)

http://git-wip-us.apache.org/repos/asf/spark/blob/007a32f9/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 975ea1a..090a1b9 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -548,10 +548,6 @@ private[netty] class NettyRpcHandler(
 nettyEnv: NettyRpcEnv,
 streamManager: StreamManager) extends RpcHandler with Logging {
 
-  // TODO: Can we add connection callback (channel registered) to the 
underlying framework?
-  // A variable to track whether we should dispatch the RemoteProcessConnected 
message.
-  private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
-
   // A variable to track the remote RpcEnv addresses of all clients
   private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
 
@@ -574,9 +570,6 @@ private[netty] class NettyRpcHandler(
 val addr = 
client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
 assert(addr != null)
 val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
-if (clients.putIfAbsent(client, JBoolean.TRUE) == null) {
-  dispatcher.postToAll(RemoteProcessConnected(clientAddr))
-}
 val requestMessage = nettyEnv.deserialize[RequestMessage](client, message)
 if (requestMessage.senderAddress == null) {
   // Create a new message with the socket address of the client as the 
sender.
@@ -613,10 +606,16 @@ private[netty] class NettyRpcHandler(
 }
   }
 
-  override def connectionTerminated(client: TransportClient): Unit = {
+  override def channelActive(client: TransportClient): Unit = {
+val addr = 
client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
+assert(addr != null)
+val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
+dispatcher.postToAll(RemoteProcessConnected(clientAddr))
+  }
+
+  override def channelInactive(client: TransportClient): Unit = {
 val addr = 
client.getChannel.remoteA

spark git commit: [SPARK-12345][CORE] Do not send SPARK_HOME through Spark submit REST interface

2015-12-18 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 007a32f90 -> ba9332edd


[SPARK-12345][CORE] Do not send SPARK_HOME through Spark submit REST interface

It is usually an invalid location on the remote machine executing the job.
It is picked up by the Mesos support in cluster mode, and most of the time 
causes
the job to fail.

Fixes SPARK-12345

Author: Luc Bourlier 

Closes #10329 from skyluc/issue/SPARK_HOME.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba9332ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba9332ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba9332ed

Branch: refs/heads/master
Commit: ba9332edd889730c906404041bc83b1643d80961
Parents: 007a32f
Author: Luc Bourlier 
Authored: Fri Dec 18 16:21:01 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 16:21:01 2015 -0800

--
 .../org/apache/spark/deploy/rest/RestSubmissionClient.scala| 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ba9332ed/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index f0dd667..0744c64 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -428,8 +428,10 @@ private[spark] object RestSubmissionClient {
* Filter non-spark environment variables from any environment.
*/
   private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
-env.filter { case (k, _) =>
-  (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED") || 
k.startsWith("MESOS_")
+env.filterKeys { k =>
+  // SPARK_HOME is filtered out because it is usually wrong on the remote 
machine (SPARK-12345)
+  (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") 
||
+k.startsWith("MESOS_")
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[3/3] spark git commit: Revert "[SPARK-12345][MESOS] Filter SPARK_HOME when submitting Spark jobs with Mesos cluster mode."

2015-12-18 Thread andrewor14
Revert "[SPARK-12345][MESOS] Filter SPARK_HOME when submitting Spark jobs with 
Mesos cluster mode."

This reverts commit ad8c1f0b840284d05da737fb2cc5ebf8848f4490.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a78a91f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a78a91f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a78a91f4

Branch: refs/heads/master
Commit: a78a91f4d7239c14bd5d0b18cdc87d55594a8d8a
Parents: 8a9417b
Author: Andrew Or 
Authored: Fri Dec 18 16:22:51 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 16:22:51 2015 -0800

--
 .../org/apache/spark/deploy/rest/mesos/MesosRestServer.scala  | 7 +--
 .../spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala   | 2 +-
 2 files changed, 2 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a78a91f4/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 24510db..868cc35 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -94,12 +94,7 @@ private[mesos] class MesosSubmitRequestServlet(
 val driverMemory = sparkProperties.get("spark.driver.memory")
 val driverCores = sparkProperties.get("spark.driver.cores")
 val appArgs = request.appArgs
-// We don't want to pass down SPARK_HOME when launching Spark apps
-// with Mesos cluster mode since it's populated by default on the client 
and it will
-// cause spark-submit script to look for files in SPARK_HOME instead.
-// We only need the ability to specify where to find spark-submit script
-// which user can user spark.executor.home or spark.home configurations.
-val environmentVariables = 
request.environmentVariables.filter(!_.equals("SPARK_HOME"))
+val environmentVariables = request.environmentVariables
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description

http://git-wip-us.apache.org/repos/asf/spark/blob/a78a91f4/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 573355b..721861f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
 
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state 
and helper
- * methods the Mesos scheduler will use.
+ * methods and Mesos scheduler will use.
  */
 private[mesos] trait MesosSchedulerUtils extends Logging {
   // Lock used to wait for scheduler to be registered


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/3] spark git commit: Revert "[SPARK-12413] Fix Mesos ZK persistence"

2015-12-18 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master ba9332edd -> a78a91f4d


Revert "[SPARK-12413] Fix Mesos ZK persistence"

This reverts commit 2bebaa39d9da33bc93ef682959cd42c1968a6a3e.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14be5dec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14be5dec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14be5dec

Branch: refs/heads/master
Commit: 14be5dece291c900bf97d87b850d906717645fd4
Parents: ba9332e
Author: Andrew Or 
Authored: Fri Dec 18 16:22:33 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 16:22:33 2015 -0800

--
 .../org/apache/spark/deploy/rest/mesos/MesosRestServer.scala   | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14be5dec/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 87d0fa8..c0b9359 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -99,11 +99,7 @@ private[mesos] class MesosSubmitRequestServlet(
 // cause spark-submit script to look for files in SPARK_HOME instead.
 // We only need the ability to specify where to find spark-submit script
 // which user can user spark.executor.home or spark.home configurations.
-//
-// Do not use `filterKeys` here to avoid SI-6654, which breaks ZK 
persistence
-val environmentVariables = request.environmentVariables.filter { case (k, 
_) =>
-  k != "SPARK_HOME"
-}
+val environmentVariables = 
request.environmentVariables.filterKeys(!_.equals("SPARK_HOME"))
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/3] spark git commit: Revert "[SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server"

2015-12-18 Thread andrewor14
Revert "[SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST 
server"

This reverts commit 8184568810e8a2e7d5371db2c6a0366ef4841f70.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a9417bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a9417bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a9417bc

Branch: refs/heads/master
Commit: 8a9417bc4b227c57bdb0a5a38a225dbdf6e69f64
Parents: 14be5de
Author: Andrew Or 
Authored: Fri Dec 18 16:22:41 2015 -0800
Committer: Andrew Or 
Committed: Fri Dec 18 16:22:41 2015 -0800

--
 .../scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8a9417bc/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index c0b9359..24510db 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -99,7 +99,7 @@ private[mesos] class MesosSubmitRequestServlet(
 // cause spark-submit script to look for files in SPARK_HOME instead.
 // We only need the ability to specify where to find spark-submit script
 // which user can user spark.executor.home or spark.home configurations.
-val environmentVariables = 
request.environmentVariables.filterKeys(!_.equals("SPARK_HOME"))
+val environmentVariables = 
request.environmentVariables.filter(!_.equals("SPARK_HOME"))
 val name = 
request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
 
 // Construct driver description


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12091] [PYSPARK] Deprecate the JAVA-specific deserialized storage levels

2015-12-18 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master a78a91f4d -> 499ac3e69


[SPARK-12091] [PYSPARK] Deprecate the JAVA-specific deserialized storage levels

The current default storage level of Python persist API is MEMORY_ONLY_SER. 
This is different from the default level MEMORY_ONLY in the official document 
and RDD APIs.

davies Is this inconsistency intentional? Thanks!

Updates: Since the data is always serialized on the Python side, the storage 
levels of JAVA-specific deserialization are not removed, such as MEMORY_ONLY.

Updates: Based on the reviewers' feedback. In Python, stored objects will 
always be serialized with the 
[Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not 
matter whether you choose a serialized level. The available storage levels in 
Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_AND_DISK`, 
`MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and `OFF_HEAP`.

Author: gatorsmile 

Closes #10092 from gatorsmile/persistStorageLevel.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/499ac3e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/499ac3e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/499ac3e6

Branch: refs/heads/master
Commit: 499ac3e69a102f9b10a1d7e14382fa191516f7b5
Parents: a78a91f
Author: gatorsmile 
Authored: Fri Dec 18 20:06:05 2015 -0800
Committer: Davies Liu 
Committed: Fri Dec 18 20:06:05 2015 -0800

--
 docs/configuration.md   |  7 ---
 docs/programming-guide.md   | 10 ++
 python/pyspark/rdd.py   |  8 
 python/pyspark/sql/dataframe.py |  6 +++---
 python/pyspark/storagelevel.py  | 31 +--
 python/pyspark/streaming/context.py |  2 +-
 python/pyspark/streaming/dstream.py |  4 ++--
 python/pyspark/streaming/flume.py   |  4 ++--
 python/pyspark/streaming/kafka.py   |  2 +-
 python/pyspark/streaming/mqtt.py|  2 +-
 10 files changed, 45 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index 38d3d05..85e7d12 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -687,9 +687,10 @@ Apart from these, the following properties are also 
available, and may be useful
   spark.rdd.compress
   false
   
-Whether to compress serialized RDD partitions (e.g. for
-StorageLevel.MEMORY_ONLY_SER). Can save substantial space at 
the cost of some
-extra CPU time.
+Whether to compress serialized RDD partitions (e.g. for 
+StorageLevel.MEMORY_ONLY_SER in Java 
+and Scala or StorageLevel.MEMORY_ONLY in Python). 
+Can save substantial space at the cost of some extra CPU time. 
   
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/docs/programming-guide.md
--
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index f823b89..c5e2a1c 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1196,14 +1196,14 @@ storage levels is:
 partitions that don't fit on disk, and read them from there when they're 
needed. 
 
 
-   MEMORY_ONLY_SER 
+   MEMORY_ONLY_SER  (Java and Scala) 
Store RDD as serialized Java objects (one byte array per 
partition).
 This is generally more space-efficient than deserialized objects, 
especially when using a
 fast serializer, but more CPU-intensive to read.
   
 
 
-   MEMORY_AND_DISK_SER 
+   MEMORY_AND_DISK_SER  (Java and Scala) 
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in 
memory to disk instead of
 recomputing them on the fly each time they're needed. 
 
@@ -1230,7 +1230,9 @@ storage levels is:
 
 
 
-**Note:** *In Python, stored objects will always be serialized with the 
[Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not 
matter whether you choose a serialized level.*
+**Note:** *In Python, stored objects will always be serialized with the 
[Pickle](https://docs.python.org/2/library/pickle.html) library, 
+so it does not matter whether you choose a serialized level. The available 
storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, 
+`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and 
`OFF_HEAP`.*
 
 Spark also automatically persists some intermediate data in shuffle operations 
(e.g. `reduceByKey`), even without users calling `persist`. This is done to 
avoid recomputing the entire input if a node fails during the shuffle. We still 
recommend users call `persist` on the resulting RDD if they plan to reuse it.
 
@@ -1243,7 +1245,7 @@ efficiency. We recommend going t