spark git commit: [SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo

2015-03-26 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 0ba759985 - 825499655


[SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo

Author: Michael Armbrust mich...@databricks.com

Closes #5191 from marmbrus/kryoRowsWithSchema and squashes the following 
commits:

bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo
f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema

(cherry picked from commit f88f51bbd461e0a42ad7021147268509b9c3c56e)
Signed-off-by: Cheng Lian l...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82549965
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82549965
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82549965

Branch: refs/heads/branch-1.3
Commit: 8254996557512b8bbc8fd35c550004b56144581f
Parents: 0ba7599
Author: Michael Armbrust mich...@databricks.com
Authored: Thu Mar 26 18:46:57 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Thu Mar 26 18:47:15 2015 +0800

--
 .../spark/sql/catalyst/expressions/rows.scala |  7 +--
 .../org/apache/spark/sql/types/Metadata.scala |  3 +++
 .../org/apache/spark/sql/types/dataTypes.scala| 18 ++
 .../spark/sql/execution/SparkSqlSerializer.scala  |  4 +---
 .../scala/org/apache/spark/sql/RowSuite.scala | 12 
 5 files changed, 39 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 8bba26b..a8983df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -66,7 +66,7 @@ object EmptyRow extends Row {
  */
 class GenericRow(protected[sql] val values: Array[Any]) extends Row {
   /** No-arg constructor for serialization. */
-  def this() = this(null)
+  protected def this() = this(null)
 
   def this(size: Int) = this(new Array[Any](size))
 
@@ -172,11 +172,14 @@ class GenericRow(protected[sql] val values: Array[Any]) 
extends Row {
 
 class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
   extends GenericRow(values) {
+
+  /** No-arg constructor for serialization. */
+  protected def this() = this(null, null)
 }
 
 class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
   /** No-arg constructor for serialization. */
-  def this() = this(null)
+  protected def this() = this(null)
 
   def this(size: Int) = this(new Array[Any](size))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index e50e976..6ee24ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -41,6 +41,9 @@ import org.apache.spark.annotation.DeveloperApi
 sealed class Metadata private[types] (private[types] val map: Map[String, Any])
   extends Serializable {
 
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
   /** Tests whether this Metadata contains a binding for a key. */
   def contains(key: String): Boolean = map.contains(key)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index d973144..952cf5c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -670,6 +670,10 @@ case class PrecisionInfo(precision: Int, scale: Int)
  */
 @DeveloperApi
 case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends 
FractionalType {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
   private[sql] type JvmType = Decimal
   @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { 
typeTag[JvmType] }
   private[sql] val numeric = Decimal.DecimalIsFractional
@@ -819,6 +823,10 

spark git commit: [MLlib]remove unused import

2015-03-26 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 1c05027a1 - 3ddb975fa


[MLlib]remove unused import

minor thing. Let me know if jira is required.

Author: Yuhao Yang hhb...@gmail.com

Closes #5207 from hhbyyh/adjustImport and squashes the following commits:

2240121 [Yuhao Yang] remove unused import


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ddb975f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ddb975f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ddb975f

Branch: refs/heads/master
Commit: 3ddb975faeddeb2674a7e7f7e80cf90dfbd4d6d2
Parents: 1c05027
Author: Yuhao Yang hhb...@gmail.com
Authored: Thu Mar 26 13:27:05 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Thu Mar 26 13:27:05 2015 +

--
 mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ddb975f/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
index 5e17c8d..9d63a08 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.clustering
 
 import java.util.Random
 
-import breeze.linalg.{DenseVector = BDV, normalize, axpy = brzAxpy}
+import breeze.linalg.{DenseVector = BDV, normalize}
 
 import org.apache.spark.Logging
 import org.apache.spark.annotation.Experimental


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6491] Spark will put the current working dir to the CLASSPATH

2015-03-26 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 836c92165 - 5b5f0e2b0


[SPARK-6491] Spark will put the current working dir to the CLASSPATH

When running bin/computer-classpath.sh, the output will be:
:/spark/conf:/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.5.0-cdh5.2.0.jar:/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/lib_managed/jars/datanucleus-core-3.2.10.jar
Java will add the current working dir to the CLASSPATH, if the first : 
exists, which is not expected by spark users.
For example, if I call spark-shell in the folder /root. And there exists a 
core-site.xml under /root/. Spark will use this file as HADOOP CONF file, 
even if I have already set HADOOP_CONF_DIR=/etc/hadoop/conf.

Author: guliangliang guliangli...@qiyi.com

Closes #5156 from marsishandsome/Spark6491 and squashes the following commits:

5ae214f [guliangliang] use appendToClasspath to change CLASSPATH
b21f3b2 [guliangliang] keep the classpath order
5d1f870 [guliangliang] [SPARK-6491] Spark will put the current working dir to 
the CLASSPATH


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5f0e2b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5f0e2b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5f0e2b

Branch: refs/heads/branch-1.3
Commit: 5b5f0e2b08941bde2655b1aec9b2ae28c377be78
Parents: 836c921
Author: guliangliang guliangli...@qiyi.com
Authored: Thu Mar 26 13:28:56 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Thu Mar 26 13:28:56 2015 +

--
 bin/compute-classpath.sh | 81 ++-
 1 file changed, 41 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5b5f0e2b/bin/compute-classpath.sh
--
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index f28..679dfaf 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -25,17 +25,24 @@ FWDIR=$(cd `dirname $0`/..; pwd)
 
 . $FWDIR/bin/load-spark-env.sh
 
-if [ -n $SPARK_CLASSPATH ]; then
-  CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH
-else
-  CLASSPATH=$SPARK_SUBMIT_CLASSPATH
-fi
+function appendToClasspath(){
+  if [ -n $1 ]; then
+if [ -n $CLASSPATH ]; then
+  CLASSPATH=$CLASSPATH:$1
+else
+  CLASSPATH=$1
+fi
+  fi
+}
+
+appendToClasspath $SPARK_CLASSPATH
+appendToClasspath $SPARK_SUBMIT_CLASSPATH
 
 # Build up classpath
 if [ -n $SPARK_CONF_DIR ]; then
-  CLASSPATH=$CLASSPATH:$SPARK_CONF_DIR
+  appendToClasspath $SPARK_CONF_DIR
 else
-  CLASSPATH=$CLASSPATH:$FWDIR/conf
+  appendToClasspath $FWDIR/conf
 fi
 
 ASSEMBLY_DIR=$FWDIR/assembly/target/scala-$SPARK_SCALA_VERSION
@@ -51,20 +58,20 @@ if [ -n $SPARK_PREPEND_CLASSES ]; then
   echo NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark\
 classes ahead of assembly. 2
   # Spark classes
-  CLASSPATH=$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes
-  CLASSPATH=$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes
-  CLASSPATH=$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes
-  CLASSPATH=$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes
-  CLASSPATH=$CLASSPATH:$FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes
-  
CLASSPATH=$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath 
$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath 
$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath $FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes
+  appendToClasspath 

spark git commit: [SPARK-6468][Block Manager] Fix the race condition of subDirs in DiskBlockManager

2015-03-26 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master f88f51bbd - 0c88ce541


[SPARK-6468][Block Manager] Fix the race condition of subDirs in 
DiskBlockManager

There are two race conditions of `subDirs` in `DiskBlockManager`:

1. `getAllFiles` does not use correct locks to read the contents in `subDirs`. 
Although it's designed for testing, it's still worth to add correct locks to 
eliminate the race condition.
2. The double-check has a race condition in `getFile(filename: String)`. If a 
thread finds `subDirs(dirId)(subDirId)` is not null out of the `synchronized` 
block, it may not be able to see the correct content of the File instance 
pointed by `subDirs(dirId)(subDirId)` according to the Java memory model (there 
is no volatile variable here).

This PR fixed the above race conditions.

Author: zsxwing zsxw...@gmail.com

Closes #5136 from zsxwing/SPARK-6468 and squashes the following commits:

cbb872b [zsxwing] Fix the race condition of subDirs in DiskBlockManager


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88ce54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88ce54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88ce54

Branch: refs/heads/master
Commit: 0c88ce5416d7687bc806a7655e17009ad5823d30
Parents: f88f51b
Author: zsxwing zsxw...@gmail.com
Authored: Thu Mar 26 12:54:48 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Thu Mar 26 12:54:48 2015 +

--
 .../apache/spark/storage/DiskBlockManager.scala | 32 +++-
 1 file changed, 18 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c88ce54/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 12cd8ea..2883137 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
 logError(Failed to create any local dir.)
 System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
+  // The content of subDirs is immutable but the content of subDirs(i) is 
mutable. And the content
+  // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
   private val shutdownHook = addShutdownHook()
@@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
 val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
 
 // Create the subdirectory if it doesn't already exist
-var subDir = subDirs(dirId)(subDirId)
-if (subDir == null) {
-  subDir = subDirs(dirId).synchronized {
-val old = subDirs(dirId)(subDirId)
-if (old != null) {
-  old
-} else {
-  val newDir = new File(localDirs(dirId), %02x.format(subDirId))
-  if (!newDir.exists()  !newDir.mkdir()) {
-throw new IOException(sFailed to create local dir in $newDir.)
-  }
-  subDirs(dirId)(subDirId) = newDir
-  newDir
+val subDir = subDirs(dirId).synchronized {
+  val old = subDirs(dirId)(subDirId)
+  if (old != null) {
+old
+  } else {
+val newDir = new File(localDirs(dirId), %02x.format(subDirId))
+if (!newDir.exists()  !newDir.mkdir()) {
+  throw new IOException(sFailed to create local dir in $newDir.)
 }
+subDirs(dirId)(subDirId) = newDir
+newDir
   }
 }
 
@@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   /** List all the files currently stored on disk by the disk manager. */
   def getAllFiles(): Seq[File] = {
 // Get all the files inside the array of array of directories
-subDirs.flatten.filter(_ != null).flatMap { dir =
+subDirs.flatMap { dir =
+  dir.synchronized {
+// Copy the content of dir because it may be modified in other threads
+dir.clone()
+  }
+}.filter(_ != null).flatMap { dir =
   val files = dir.listFiles()
   if (files != null) files else Seq.empty
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns

2015-03-26 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 0c88ce541 - 1c05027a1


[SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema 
to support dropping of columns using replace columns

Currently in the parquet relation 2 implementation, error is thrown in case 
merged schema is not exactly the same as metastore schema.
But to support cases like deletion of column using replace column command, we 
can relax the restriction so that even if metastore schema is a subset of 
merged parquet schema, the query will work.

Author: Yash Datta yash.da...@guavus.com

Closes #5141 from saucam/replace_col and squashes the following commits:

e858d5b [Yash Datta] SPARK-6471: Fix test cases, add a new test case for 
metastore schema to be subset of parquet schema
5f2f467 [Yash Datta] SPARK-6471: Metastore schema should only be a subset of 
parquet schema to support dropping of columns using replace columns


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c05027a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c05027a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c05027a

Branch: refs/heads/master
Commit: 1c05027a143d1b0bf3df192984e6cac752b1e926
Parents: 0c88ce5
Author: Yash Datta yash.da...@guavus.com
Authored: Thu Mar 26 21:13:38 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Thu Mar 26 21:13:38 2015 +0800

--
 .../org/apache/spark/sql/parquet/newParquet.scala |  5 +++--
 .../spark/sql/parquet/ParquetSchemaSuite.scala| 18 --
 2 files changed, 19 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c05027a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 410600b..3516cfe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -758,12 +758,13 @@ private[sql] object ParquetRelation2 extends Logging {
  |${parquetSchema.prettyJson}
.stripMargin
 
-assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
+assert(metastoreSchema.size = parquetSchema.size, schemaConflictMessage)
 
 val ordinalMap = metastoreSchema.zipWithIndex.map {
   case (field, index) = field.name.toLowerCase - index
 }.toMap
-val reorderedParquetSchema = parquetSchema.sortBy(f = 
ordinalMap(f.name.toLowerCase))
+val reorderedParquetSchema = parquetSchema.sortBy(f = 
+  ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
 
 StructType(metastoreSchema.zip(reorderedParquetSchema).map {
   // Uses Parquet field names but retains Metastore data types.

http://git-wip-us.apache.org/repos/asf/spark/blob/1c05027a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 321832c..8462f9b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -212,8 +212,11 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest 
{
   StructField(UPPERCase, IntegerType, nullable = true
 }
 
-// Conflicting field count
-assert(intercept[Throwable] {
+// MetaStore schema is subset of parquet schema
+assertResult(
+  StructType(Seq(
+StructField(UPPERCase, DoubleType, nullable = false {
+
   ParquetRelation2.mergeMetastoreParquetSchema(
 StructType(Seq(
   StructField(uppercase, DoubleType, nullable = false))),
@@ -221,6 +224,17 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest 
{
 StructType(Seq(
   StructField(lowerCase, BinaryType),
   StructField(UPPERCase, IntegerType, nullable = true
+}
+
+// Conflicting field count
+assert(intercept[Throwable] {
+  ParquetRelation2.mergeMetastoreParquetSchema(
+StructType(Seq(
+  StructField(uppercase, DoubleType, nullable = false),
+  StructField(lowerCase, BinaryType))),
+
+StructType(Seq(
+  StructField(UPPERCase, IntegerType, nullable = true
 }.getMessage.contains(detected conflicting schemas))
 
 // Conflicting field names


-
To unsubscribe, 

spark git commit: [SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns

2015-03-26 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 825499655 - 836c92165


[SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema 
to support dropping of columns using replace columns

Currently in the parquet relation 2 implementation, error is thrown in case 
merged schema is not exactly the same as metastore schema.
But to support cases like deletion of column using replace column command, we 
can relax the restriction so that even if metastore schema is a subset of 
merged parquet schema, the query will work.

Author: Yash Datta yash.da...@guavus.com

Closes #5141 from saucam/replace_col and squashes the following commits:

e858d5b [Yash Datta] SPARK-6471: Fix test cases, add a new test case for 
metastore schema to be subset of parquet schema
5f2f467 [Yash Datta] SPARK-6471: Metastore schema should only be a subset of 
parquet schema to support dropping of columns using replace columns

(cherry picked from commit 1c05027a143d1b0bf3df192984e6cac752b1e926)
Signed-off-by: Cheng Lian l...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/836c9216
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/836c9216
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/836c9216

Branch: refs/heads/branch-1.3
Commit: 836c9216599b676ae8f421384f4f20fd35e8c53b
Parents: 8254996
Author: Yash Datta yash.da...@guavus.com
Authored: Thu Mar 26 21:13:38 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Thu Mar 26 21:14:15 2015 +0800

--
 .../org/apache/spark/sql/parquet/newParquet.scala |  5 +++--
 .../spark/sql/parquet/ParquetSchemaSuite.scala| 18 --
 2 files changed, 19 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/836c9216/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 410600b..3516cfe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -758,12 +758,13 @@ private[sql] object ParquetRelation2 extends Logging {
  |${parquetSchema.prettyJson}
.stripMargin
 
-assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
+assert(metastoreSchema.size = parquetSchema.size, schemaConflictMessage)
 
 val ordinalMap = metastoreSchema.zipWithIndex.map {
   case (field, index) = field.name.toLowerCase - index
 }.toMap
-val reorderedParquetSchema = parquetSchema.sortBy(f = 
ordinalMap(f.name.toLowerCase))
+val reorderedParquetSchema = parquetSchema.sortBy(f = 
+  ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
 
 StructType(metastoreSchema.zip(reorderedParquetSchema).map {
   // Uses Parquet field names but retains Metastore data types.

http://git-wip-us.apache.org/repos/asf/spark/blob/836c9216/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 321832c..8462f9b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -212,8 +212,11 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest 
{
   StructField(UPPERCase, IntegerType, nullable = true
 }
 
-// Conflicting field count
-assert(intercept[Throwable] {
+// MetaStore schema is subset of parquet schema
+assertResult(
+  StructType(Seq(
+StructField(UPPERCase, DoubleType, nullable = false {
+
   ParquetRelation2.mergeMetastoreParquetSchema(
 StructType(Seq(
   StructField(uppercase, DoubleType, nullable = false))),
@@ -221,6 +224,17 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest 
{
 StructType(Seq(
   StructField(lowerCase, BinaryType),
   StructField(UPPERCase, IntegerType, nullable = true
+}
+
+// Conflicting field count
+assert(intercept[Throwable] {
+  ParquetRelation2.mergeMetastoreParquetSchema(
+StructType(Seq(
+  StructField(uppercase, DoubleType, nullable = false),
+  StructField(lowerCase, BinaryType))),
+
+StructType(Seq(
+  StructField(UPPERCase, IntegerType, nullable = true
 }.getMessage.contains(detected conflicting schemas))
 

spark git commit: [SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB.

2015-03-26 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 39fb57968 - 49d2ec63e


[SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB.

Kryo buffers are backed by byte arrays, but primitive arrays can only be
up to 2GB in size. It is misleading to allow users to set buffers past
this size.

Author: mcheah mch...@palantir.com

Closes #5218 from mccheah/feature/limit-kryo-buffer and squashes the following 
commits:

1d6d1be [mcheah] Fixing numeric typo
e2e30ce [mcheah] Removing explicit int and double type to match style
09fd80b [mcheah] Should be = not . Slightly more consistent error message.
60634f9 [mcheah] [SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49d2ec63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49d2ec63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49d2ec63

Branch: refs/heads/master
Commit: 49d2ec63eccec8a3a78b15b583c36f84310fc6f0
Parents: 39fb579
Author: mcheah mch...@palantir.com
Authored: Thu Mar 26 22:48:42 2015 -0700
Committer: Patrick Wendell patr...@databricks.com
Committed: Thu Mar 26 22:48:42 2015 -0700

--
 .../apache/spark/serializer/KryoSerializer.scala| 16 +---
 1 file changed, 13 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/49d2ec63/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index f83bcaa..579fb66 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -49,10 +49,20 @@ class KryoSerializer(conf: SparkConf)
   with Logging
   with Serializable {
 
-  private val bufferSize =
-(conf.getDouble(spark.kryoserializer.buffer.mb, 0.064) * 1024 * 
1024).toInt
+  private val bufferSizeMb = conf.getDouble(spark.kryoserializer.buffer.mb, 
0.064)
+  if (bufferSizeMb = 2048) {
+throw new IllegalArgumentException(spark.kryoserializer.buffer.mb must be 
less than  +
+  s2048 mb, got: + $bufferSizeMb mb.)
+  }
+  private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt
+
+  val maxBufferSizeMb = conf.getInt(spark.kryoserializer.buffer.max.mb, 64)
+  if (maxBufferSizeMb = 2048) {
+throw new IllegalArgumentException(spark.kryoserializer.buffer.max.mb 
must be less than  +
+  s2048 mb, got: + $maxBufferSizeMb mb.)
+  }
+  private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
 
-  private val maxBufferSize = 
conf.getInt(spark.kryoserializer.buffer.max.mb, 64) * 1024 * 1024
   private val referenceTracking = 
conf.getBoolean(spark.kryo.referenceTracking, true)
   private val registrationRequired = 
conf.getBoolean(spark.kryo.registrationRequired, false)
   private val userRegistrator = conf.getOption(spark.kryo.registrator)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-6532 [BUILD] LDAModel.scala fails scalastyle on Windows

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fe15ea976 - c3a52a082


SPARK-6532 [BUILD] LDAModel.scala fails scalastyle on Windows

Use standard UTF-8 source / report encoding for scalastyle

Author: Sean Owen so...@cloudera.com

Closes #5211 from srowen/SPARK-6532 and squashes the following commits:

16a33e5 [Sean Owen] Use standard UTF-8 source / report encoding for scalastyle


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3a52a08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3a52a08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3a52a08

Branch: refs/heads/master
Commit: c3a52a08248db08eade29b265f02483144a282d6
Parents: fe15ea9
Author: Sean Owen so...@cloudera.com
Authored: Thu Mar 26 10:52:31 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 10:52:31 2015 -0700

--
 pom.xml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c3a52a08/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 23bb161..b3cecd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1452,7 +1452,8 @@
   testSourceDirectory${basedir}/src/test/scala/testSourceDirectory
   configLocationscalastyle-config.xml/configLocation
   outputFilescalastyle-output.xml/outputFile
-  outputEncodingUTF-8/outputEncoding
+  inputEncoding${project.build.sourceEncoding}/inputEncoding
+  outputEncoding${project.reporting.outputEncoding}/outputEncoding
 /configuration
 executions
   execution


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6554] [SQL] Don't push down predicates which reference partition column(s)

2015-03-26 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 784fcd532 - 71a0d40eb


[SPARK-6554] [SQL] Don't push down predicates which reference partition 
column(s)

There are two cases for the new Parquet data source:

1. Partition columns exist in the Parquet data files

   We don't need to push-down these predicates since partition pruning already 
handles them.

1. Partition columns don't exist in the Parquet data files

   We can't push-down these predicates since they are considered as invalid 
columns by Parquet.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/5210)
!-- Reviewable:end --

Author: Cheng Lian l...@databricks.com

Closes #5210 from liancheng/spark-6554 and squashes the following commits:

4f7ec03 [Cheng Lian] Adds comments
e134ced [Cheng Lian] Don't push down predicates which reference partition 
column(s)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71a0d40e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71a0d40e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71a0d40e

Branch: refs/heads/master
Commit: 71a0d40ebd37c80d8020e184366778b57c762285
Parents: 784fcd5
Author: Cheng Lian l...@databricks.com
Authored: Thu Mar 26 13:11:37 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Mar 26 13:11:37 2015 -0700

--
 .../org/apache/spark/sql/parquet/newParquet.scala  | 17 -
 .../spark/sql/parquet/ParquetFilterSuite.scala | 17 +
 2 files changed, 29 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/71a0d40e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 3516cfe..0d68810 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -435,11 +435,18 @@ private[sql] case class ParquetRelation2(
 // Push down filters when possible. Notice that not all filters can be 
converted to Parquet
 // filter predicate. Here we try to convert each individual predicate and 
only collect those
 // convertible ones.
-predicates
-  .flatMap(ParquetFilters.createFilter)
-  .reduceOption(FilterApi.and)
-  .filter(_ = sqlContext.conf.parquetFilterPushDown)
-  .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+if (sqlContext.conf.parquetFilterPushDown) {
+  predicates
+// Don't push down predicates which reference partition columns
+.filter { pred =
+  val partitionColNames = partitionColumns.map(_.name).toSet
+  val referencedColNames = pred.references.map(_.name).toSet
+  referencedColNames.intersect(partitionColNames).isEmpty
+}
+.flatMap(ParquetFilters.createFilter)
+.reduceOption(FilterApi.and)
+.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+}
 
 if (isPartitioned) {
   logInfo {

http://git-wip-us.apache.org/repos/asf/spark/blob/71a0d40e/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 4d32e84..6a2c2a7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -321,6 +321,23 @@ class ParquetDataSourceOnFilterSuite extends 
ParquetFilterSuiteBase with BeforeA
   override protected def afterAll(): Unit = {
 sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
   }
+
+  test(SPARK-6554: don't push down predicates which reference partition 
columns) {
+import sqlContext.implicits._
+
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED - true) {
+  withTempPath { dir =
+val path = s${dir.getCanonicalPath}/part=1
+(1 to 3).map(i = (i, i.toString)).toDF(a, 
b).saveAsParquetFile(path)
+
+// If the part = 1 filter gets pushed down, this query will throw an 
exception since
+// part is not a valid column in the actual Parquet file
+checkAnswer(
+  sqlContext.parquetFile(path).filter(part = 1),
+  (1 to 3).map(i = Row(i, i.toString, 1)))
+  }
+}
+  }
 }
 
 class ParquetDataSourceOffFilterSuite extends 

spark git commit: [SPARK-6117] [SQL] Improvements to DataFrame.describe()

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 84735c363 - 28e3a1e34


[SPARK-6117] [SQL] Improvements to DataFrame.describe()

1. Slightly modifications to the code to make it more readable.
2. Added Python implementation.
3. Updated the documentation to state that we don't guarantee the output schema 
for this function and it should only be used for exploratory data analysis.

Author: Reynold Xin r...@databricks.com

Closes #5201 from rxin/df-describe and squashes the following commits:

25a7834 [Reynold Xin] Reset run-tests.
6abdfee [Reynold Xin] [SPARK-6117] [SQL] Improvements to DataFrame.describe()

(cherry picked from commit 784fcd532784fcfd9bf0a1db71c9f71c469ee716)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28e3a1e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28e3a1e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28e3a1e3

Branch: refs/heads/branch-1.3
Commit: 28e3a1e34db2f6e16891d6ae154db10c8170e5e4
Parents: 84735c3
Author: Reynold Xin r...@databricks.com
Authored: Thu Mar 26 12:26:13 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 12:26:26 2015 -0700

--
 python/pyspark/sql/dataframe.py | 19 
 .../scala/org/apache/spark/sql/DataFrame.scala  | 46 
 .../org/apache/spark/sql/DataFrameSuite.scala   |  3 +-
 3 files changed, 48 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/28e3a1e3/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index bf7c47b..d51309f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -520,6 +520,25 @@ class DataFrame(object):
 
 orderBy = sort
 
+def describe(self, *cols):
+Computes statistics for numeric columns.
+
+This include count, mean, stddev, min, and max. If no columns are
+given, this function computes statistics for all numerical columns.
+
+ df.describe().show()
+summary age
+count   2
+mean3.5
+stddev  1.5
+min 2
+max 5
+
+cols = ListConverter().convert(cols,
+   
self.sql_ctx._sc._gateway._gateway_client)
+jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols))
+return DataFrame(jdf, self.sql_ctx)
+
 def head(self, n=None):
  Return the first `n` rows or the first row if n is None.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/28e3a1e3/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index db56182..4c80359 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.sql.catalyst.{expressions, ScalaReflection, SqlParser}
+import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, 
ResolvedStar}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
 import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{NumericType, StructType, StructField, 
StringType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.sources.{ResolvedDataSource, 
CreateTableUsingAsSelect}
 import org.apache.spark.util.Utils
 
@@ -752,15 +752,17 @@ class DataFrame private[sql](
   }
 
   /**
-   * Compute numerical statistics for given columns of this [[DataFrame]]:
-   * count, mean (avg), stddev (standard deviation), min, max.
-   * Each row of the resulting [[DataFrame]] contains column with statistic 
name
-   * and columns with statistic results for each given column.
-   * If no columns are given then computes for all numerical columns.
+   * Computes statistics for numeric columns, including count, mean, stddev, 
min, and max.
+   * If no columns are given, this function computes statistics for all 
numerical columns.
+   *
+   * This function is meant for 

spark git commit: [SPARK-6117] [SQL] Improvements to DataFrame.describe()

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master c3a52a082 - 784fcd532


[SPARK-6117] [SQL] Improvements to DataFrame.describe()

1. Slightly modifications to the code to make it more readable.
2. Added Python implementation.
3. Updated the documentation to state that we don't guarantee the output schema 
for this function and it should only be used for exploratory data analysis.

Author: Reynold Xin r...@databricks.com

Closes #5201 from rxin/df-describe and squashes the following commits:

25a7834 [Reynold Xin] Reset run-tests.
6abdfee [Reynold Xin] [SPARK-6117] [SQL] Improvements to DataFrame.describe()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/784fcd53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/784fcd53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/784fcd53

Branch: refs/heads/master
Commit: 784fcd532784fcfd9bf0a1db71c9f71c469ee716
Parents: c3a52a0
Author: Reynold Xin r...@databricks.com
Authored: Thu Mar 26 12:26:13 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 12:26:13 2015 -0700

--
 python/pyspark/sql/dataframe.py | 19 
 .../scala/org/apache/spark/sql/DataFrame.scala  | 46 
 .../org/apache/spark/sql/DataFrameSuite.scala   |  3 +-
 3 files changed, 48 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/784fcd53/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index bf7c47b..d51309f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -520,6 +520,25 @@ class DataFrame(object):
 
 orderBy = sort
 
+def describe(self, *cols):
+Computes statistics for numeric columns.
+
+This include count, mean, stddev, min, and max. If no columns are
+given, this function computes statistics for all numerical columns.
+
+ df.describe().show()
+summary age
+count   2
+mean3.5
+stddev  1.5
+min 2
+max 5
+
+cols = ListConverter().convert(cols,
+   
self.sql_ctx._sc._gateway._gateway_client)
+jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols))
+return DataFrame(jdf, self.sql_ctx)
+
 def head(self, n=None):
  Return the first `n` rows or the first row if n is None.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/784fcd53/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index db56182..4c80359 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.sql.catalyst.{expressions, ScalaReflection, SqlParser}
+import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, 
ResolvedStar}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
 import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{NumericType, StructType, StructField, 
StringType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.sources.{ResolvedDataSource, 
CreateTableUsingAsSelect}
 import org.apache.spark.util.Utils
 
@@ -752,15 +752,17 @@ class DataFrame private[sql](
   }
 
   /**
-   * Compute numerical statistics for given columns of this [[DataFrame]]:
-   * count, mean (avg), stddev (standard deviation), min, max.
-   * Each row of the resulting [[DataFrame]] contains column with statistic 
name
-   * and columns with statistic results for each given column.
-   * If no columns are given then computes for all numerical columns.
+   * Computes statistics for numeric columns, including count, mean, stddev, 
min, and max.
+   * If no columns are given, this function computes statistics for all 
numerical columns.
+   *
+   * This function is meant for exploratory data analysis, as we make no 
guarantee about the
+   * backward compatibility of the schema of the resulting 

spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 aa2d157c6 - 84735c363


[SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

Please review my solution for SPARK-6117

Author: azagrebin azagre...@gmail.com

Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits:

f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it 
locally into resulting DF, colocate test data with test case
ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF 
without numeric columns
9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for 
summary statistics

(cherry picked from commit 5bbcd1304cfebba31ec6857a80d3825a40d02e83)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84735c36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84735c36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84735c36

Branch: refs/heads/branch-1.3
Commit: 84735c363e220baf2cc39dcef5f040812e23c086
Parents: aa2d157
Author: azagrebin azagre...@gmail.com
Authored: Thu Mar 26 00:25:04 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 12:25:48 2015 -0700

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 53 +++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 45 +
 2 files changed, 97 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5aece16..db56182 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
 import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{NumericType, StructType}
+import org.apache.spark.sql.types.{NumericType, StructType, StructField, 
StringType}
 import org.apache.spark.sql.sources.{ResolvedDataSource, 
CreateTableUsingAsSelect}
 import org.apache.spark.util.Utils
 
@@ -752,6 +752,57 @@ class DataFrame private[sql](
   }
 
   /**
+   * Compute numerical statistics for given columns of this [[DataFrame]]:
+   * count, mean (avg), stddev (standard deviation), min, max.
+   * Each row of the resulting [[DataFrame]] contains column with statistic 
name
+   * and columns with statistic results for each given column.
+   * If no columns are given then computes for all numerical columns.
+   *
+   * {{{
+   *   df.describe(age, height)
+   *
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min 18.0  163.0
+   *   // max 92.0  192.0
+   * }}}
+   */
+  @scala.annotation.varargs
+  def describe(cols: String*): DataFrame = {
+
+def stddevExpr(expr: Expression) =
+  Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), 
Average(expr
+
+val statistics = List[(String, Expression = Expression)](
+  count - Count,
+  mean - Average,
+  stddev - stddevExpr,
+  min - Min,
+  max - Max)
+
+val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else 
cols).toList
+
+val localAgg = if (aggCols.nonEmpty) {
+  val aggExprs = statistics.flatMap { case (_, colToAgg) =
+aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c))
+  }
+
+  agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
+.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, 
(statistic, _)) =
+Row(statistic :: aggregation.toList: _*)
+  }
+} else {
+  statistics.map { case (name, _) = Row(name) }
+}
+
+val schema = StructType((summary :: aggCols).map(StructField(_, 
StringType)))
+val rowRdd = sqlContext.sparkContext.parallelize(localAgg)
+sqlContext.createDataFrame(rowRdd, schema)
+  }
+
+  /**
* Returns the first `n` rows.
* @group action
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c30ed69..afbedd1 100644
--- 

spark git commit: [DOCS][SQL] Fix JDBC example

2015-03-26 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 3d545782e - 54d92b542


[DOCS][SQL] Fix JDBC example

Author: Michael Armbrust mich...@databricks.com

Closes #5192 from marmbrus/fixJDBCDocs and squashes the following commits:

b48a33d [Michael Armbrust] [DOCS][SQL] Fix JDBC example

(cherry picked from commit aad00322765d6041e817a6bd3fcff2187d212057)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d92b54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d92b54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d92b54

Branch: refs/heads/branch-1.3
Commit: 54d92b5424705215b177c02665c163907c9c7de4
Parents: 3d54578
Author: Michael Armbrust mich...@databricks.com
Authored: Thu Mar 26 14:51:46 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Mar 26 14:51:58 2015 -0700

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/54d92b54/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index c99a0b0..4441d6a 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1406,7 +1406,7 @@ DataFrame jdbcDF = sqlContext.load(jdbc, options)
 
 {% highlight python %}
 
-df = sqlContext.load(jdbc, url=jdbc:postgresql:dbserver, 
dbtable=schema.tablename)
+df = sqlContext.load(source=jdbc, url=jdbc:postgresql:dbserver, 
dbtable=schema.tablename)
 
 {% endhighlight %}
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCS][SQL] Fix JDBC example

2015-03-26 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 71a0d40eb - aad003227


[DOCS][SQL] Fix JDBC example

Author: Michael Armbrust mich...@databricks.com

Closes #5192 from marmbrus/fixJDBCDocs and squashes the following commits:

b48a33d [Michael Armbrust] [DOCS][SQL] Fix JDBC example


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aad00322
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aad00322
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aad00322

Branch: refs/heads/master
Commit: aad00322765d6041e817a6bd3fcff2187d212057
Parents: 71a0d40
Author: Michael Armbrust mich...@databricks.com
Authored: Thu Mar 26 14:51:46 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Mar 26 14:51:46 2015 -0700

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aad00322/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index c99a0b0..4441d6a 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1406,7 +1406,7 @@ DataFrame jdbcDF = sqlContext.load(jdbc, options)
 
 {% highlight python %}
 
-df = sqlContext.load(jdbc, url=jdbc:postgresql:dbserver, 
dbtable=schema.tablename)
+df = sqlContext.load(source=jdbc, url=jdbc:postgresql:dbserver, 
dbtable=schema.tablename)
 
 {% endhighlight %}
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference

2015-03-26 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master aad003227 - 39fb57968


[SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference

Adds a `Graph#minus` method which will return only unique `VertexId`'s from the 
calling `VertexRDD`.

To demonstrate a basic example with pseudocode:

```
Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2)))
 Set((0L,0))
```

Author: Brennon York brennon.y...@capitalone.com

Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits:

248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid 
createUsingIndex and updated the mask operations to simplify with andNot call
3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus 
method
6575d92 [Brennon York] updated mima exclude
aaa030b [Brennon York] completed graph#minus functionality
7227c0f [Brennon York] beginning work on minus functionality


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39fb5796
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39fb5796
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39fb5796

Branch: refs/heads/master
Commit: 39fb57968352549f2276ac4fcd2b92988ed6fe42
Parents: aad0032
Author: Brennon York brennon.y...@capitalone.com
Authored: Thu Mar 26 19:08:09 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Mar 26 19:08:09 2015 -0700

--
 docs/graphx-programming-guide.md|  2 ++
 .../org/apache/spark/graphx/VertexRDD.scala | 16 ++
 .../graphx/impl/VertexPartitionBaseOps.scala| 15 +
 .../spark/graphx/impl/VertexRDDImpl.scala   | 25 +++
 .../apache/spark/graphx/VertexRDDSuite.scala| 33 ++--
 project/MimaExcludes.scala  |  3 ++
 6 files changed, 92 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/docs/graphx-programming-guide.md
--
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index c601d79..3f10cb2 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -899,6 +899,8 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] {
   // Transform the values without changing the ids (preserves the internal 
index)
   def mapValues[VD2](map: VD = VD2): VertexRDD[VD2]
   def mapValues[VD2](map: (VertexId, VD) = VD2): VertexRDD[VD2]
+  // Show only vertices unique to this set based on their VertexId's
+  def minus(other: RDD[(VertexId, VD)])
   // Remove vertices from this set that appear in the other set
   def diff(other: VertexRDD[VD]): VertexRDD[VD]
   // Join operators that take advantage of the internal indexing to accelerate 
joins (substantially)

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index ad4bfe0..a9f04b5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -122,6 +122,22 @@ abstract class VertexRDD[VD](
   def mapValues[VD2: ClassTag](f: (VertexId, VD) = VD2): VertexRDD[VD2]
 
   /**
+   * For each VertexId present in both `this` and `other`, minus will act as a 
set difference
+   * operation returning only those unique VertexId's present in `this`.
+   *
+   * @param other an RDD to run the set operation against
+   */
+  def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+  /**
+   * For each VertexId present in both `this` and `other`, minus will act as a 
set difference
+   * operation returning only those unique VertexId's present in `this`.
+   *
+   * @param other a VertexRDD to run the set operation against
+   */
+  def minus(other: VertexRDD[VD]): VertexRDD[VD]
+
+  /**
* For each vertex present in both `this` and `other`, `diff` returns only 
those vertices with
* differing values; for values that are different, keeps the values from 
`other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index 4fd2548..b90f9fa 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ 

spark git commit: [SPARK-6536] [PySpark] Column.inSet() in Python

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 9edb34fc3 - 0ba759985


[SPARK-6536] [PySpark] Column.inSet() in Python

```
 df[df.name.inSet(Bob, Mike)].collect()
[Row(age=5, name=u'Bob')]
 df[df.age.inSet([1, 2, 3])].collect()
[Row(age=2, name=u'Alice')]
```

Author: Davies Liu dav...@databricks.com

Closes #5190 from davies/in and squashes the following commits:

6b73a47 [Davies Liu] Column.inSet() in Python

(cherry picked from commit f535802977c5a3ce45894d89fdf59f8723f023c8)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ba75998
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ba75998
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ba75998

Branch: refs/heads/branch-1.3
Commit: 0ba759985288f5df6940c37f5f401bc31de53a1c
Parents: 9edb34f
Author: Davies Liu dav...@databricks.com
Authored: Thu Mar 26 00:01:24 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 00:01:32 2015 -0700

--
 python/pyspark/sql/dataframe.py | 17 +
 1 file changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0ba75998/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 5cb89da..bf7c47b 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -985,6 +985,23 @@ class Column(object):
 
 __getslice__ = substr
 
+def inSet(self, *cols):
+ A boolean expression that is evaluated to true if the value of this
+expression is contained by the evaluated values of the arguments.
+
+ df[df.name.inSet(Bob, Mike)].collect()
+[Row(age=5, name=u'Bob')]
+ df[df.age.inSet([1, 2, 3])].collect()
+[Row(age=2, name=u'Alice')]
+
+if len(cols) == 1 and isinstance(cols[0], (list, set)):
+cols = cols[0]
+cols = [c._jc if isinstance(c, Column) else 
_create_column_from_literal(c) for c in cols]
+sc = SparkContext._active_spark_context
+jcols = ListConverter().convert(cols, sc._gateway._gateway_client)
+jc = getattr(self._jc, in)(sc._jvm.PythonUtils.toSeq(jcols))
+return Column(jc)
+
 # order
 asc = _unary_op(asc, Returns a sort expression based on the
 ascending order of the given column name.)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6536] [PySpark] Column.inSet() in Python

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 276ef1c3c - f53580297


[SPARK-6536] [PySpark] Column.inSet() in Python

```
 df[df.name.inSet(Bob, Mike)].collect()
[Row(age=5, name=u'Bob')]
 df[df.age.inSet([1, 2, 3])].collect()
[Row(age=2, name=u'Alice')]
```

Author: Davies Liu dav...@databricks.com

Closes #5190 from davies/in and squashes the following commits:

6b73a47 [Davies Liu] Column.inSet() in Python


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5358029
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5358029
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5358029

Branch: refs/heads/master
Commit: f535802977c5a3ce45894d89fdf59f8723f023c8
Parents: 276ef1c
Author: Davies Liu dav...@databricks.com
Authored: Thu Mar 26 00:01:24 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 00:01:24 2015 -0700

--
 python/pyspark/sql/dataframe.py | 17 +
 1 file changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5358029/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 5cb89da..bf7c47b 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -985,6 +985,23 @@ class Column(object):
 
 __getslice__ = substr
 
+def inSet(self, *cols):
+ A boolean expression that is evaluated to true if the value of this
+expression is contained by the evaluated values of the arguments.
+
+ df[df.name.inSet(Bob, Mike)].collect()
+[Row(age=5, name=u'Bob')]
+ df[df.age.inSet([1, 2, 3])].collect()
+[Row(age=2, name=u'Alice')]
+
+if len(cols) == 1 and isinstance(cols[0], (list, set)):
+cols = cols[0]
+cols = [c._jc if isinstance(c, Column) else 
_create_column_from_literal(c) for c in cols]
+sc = SparkContext._active_spark_context
+jcols = ListConverter().convert(cols, sc._gateway._gateway_client)
+jc = getattr(self._jc, in)(sc._jvm.PythonUtils.toSeq(jcols))
+return Column(jc)
+
 # order
 asc = _unary_op(asc, Returns a sort expression based on the
 ascending order of the given column name.)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f53580297 - 5bbcd1304


[SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

Please review my solution for SPARK-6117

Author: azagrebin azagre...@gmail.com

Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits:

f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it 
locally into resulting DF, colocate test data with test case
ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF 
without numeric columns
9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for 
summary statistics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bbcd130
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bbcd130
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bbcd130

Branch: refs/heads/master
Commit: 5bbcd1304cfebba31ec6857a80d3825a40d02e83
Parents: f535802
Author: azagrebin azagre...@gmail.com
Authored: Thu Mar 26 00:25:04 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 00:25:04 2015 -0700

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 53 +++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 45 +
 2 files changed, 97 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5aece16..db56182 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
 import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{NumericType, StructType}
+import org.apache.spark.sql.types.{NumericType, StructType, StructField, 
StringType}
 import org.apache.spark.sql.sources.{ResolvedDataSource, 
CreateTableUsingAsSelect}
 import org.apache.spark.util.Utils
 
@@ -752,6 +752,57 @@ class DataFrame private[sql](
   }
 
   /**
+   * Compute numerical statistics for given columns of this [[DataFrame]]:
+   * count, mean (avg), stddev (standard deviation), min, max.
+   * Each row of the resulting [[DataFrame]] contains column with statistic 
name
+   * and columns with statistic results for each given column.
+   * If no columns are given then computes for all numerical columns.
+   *
+   * {{{
+   *   df.describe(age, height)
+   *
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min 18.0  163.0
+   *   // max 92.0  192.0
+   * }}}
+   */
+  @scala.annotation.varargs
+  def describe(cols: String*): DataFrame = {
+
+def stddevExpr(expr: Expression) =
+  Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), 
Average(expr
+
+val statistics = List[(String, Expression = Expression)](
+  count - Count,
+  mean - Average,
+  stddev - stddevExpr,
+  min - Min,
+  max - Max)
+
+val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else 
cols).toList
+
+val localAgg = if (aggCols.nonEmpty) {
+  val aggExprs = statistics.flatMap { case (_, colToAgg) =
+aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c))
+  }
+
+  agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
+.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, 
(statistic, _)) =
+Row(statistic :: aggregation.toList: _*)
+  }
+} else {
+  statistics.map { case (name, _) = Row(name) }
+}
+
+val schema = StructType((summary :: aggCols).map(StructField(_, 
StringType)))
+val rowRdd = sqlContext.sparkContext.parallelize(localAgg)
+sqlContext.createDataFrame(rowRdd, schema)
+  }
+
+  /**
* Returns the first `n` rows.
* @group action
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c30ed69..afbedd1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -443,6 +443,51 @@ class DataFrameSuite 

spark git commit: SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases

2015-03-26 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5b5f0e2b0 - aa2d157c6


SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases

Fix fastBucketFunction for histogram() to handle edge conditions more 
correctly. Add a test, and fix existing one accordingly

Author: Sean Owen so...@cloudera.com

Closes #5148 from srowen/SPARK-6480 and squashes the following commits:

974a0a0 [Sean Owen] Additional test of huge ranges, and a few more comments 
(and comment fixes)
23ec01e [Sean Owen] Fix fastBucketFunction for histogram() to handle edge 
conditions more correctly. Add a test, and fix existing one accordingly

(cherry picked from commit fe15ea976073edd738c006af1eb8d31617a039fc)
Signed-off-by: Sean Owen so...@cloudera.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa2d157c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa2d157c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa2d157c

Branch: refs/heads/branch-1.3
Commit: aa2d157c62b8c4d1d5479c92844d9ab1207e4240
Parents: 5b5f0e2
Author: Sean Owen so...@cloudera.com
Authored: Thu Mar 26 15:00:23 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Thu Mar 26 15:00:32 2015 +

--
 .../apache/spark/rdd/DoubleRDDFunctions.scala   | 20 
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 24 
 2 files changed, 29 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa2d157c/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index e66f83b..a3a03ef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends 
Logging with Serializable {
   }
 }
 // Determine the bucket function in constant time. Requires that buckets 
are evenly spaced
-def fastBucketFunction(min: Double, increment: Double, count: Int)(e: 
Double): Option[Int] = {
+def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): 
Option[Int] = {
   // If our input is not a number unless the increment is also NaN then we 
fail fast
-  if (e.isNaN()) {
-return None
-  }
-  val bucketNumber = (e - min)/(increment)
-  // We do this rather than buckets.lengthCompare(bucketNumber)
-  // because Array[Double] fails to override it (for now).
-  if (bucketNumber  count || bucketNumber  0) {
+  if (e.isNaN || e  min || e  max) {
 None
   } else {
-Some(bucketNumber.toInt.min(count - 1))
+// Compute ratio of e's distance along range to total range first, for 
better precision
+val bucketNumber = (((e - min) / (max - min)) * count).toInt
+// should be less than count, but will equal count if e == max, in 
which case
+// it's part of the last end-range-inclusive bucket, so return count-1
+Some(math.min(bucketNumber, count - 1))
   }
 }
 // Decide which bucket function to pass to histogramPartition. We decide 
here
-// rather than having a general function so that the decission need only 
be made
+// rather than having a general function so that the decision need only be 
made
 // once rather than once per shard
 val bucketFunction = if (evenBuckets) {
-  fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
+  fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _
 } else {
   basicBucketFunction _
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa2d157c/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index de30653..787b06e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -232,6 +232,12 @@ class DoubleRDDSuite extends FunSuite with 
SharedSparkContext {
 assert(histogramBuckets === expectedHistogramBuckets)
   }
 
+  test(WorksWithDoubleValuesAtMinMax) {
+val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3))
+assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2)
+assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2)
+  }
+
   test(WorksWithoutBucketsWithMoreRequestedThanElements) {
 // Verify the basic case of one bucket 

spark git commit: SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases

2015-03-26 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 3ddb975fa - fe15ea976


SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases

Fix fastBucketFunction for histogram() to handle edge conditions more 
correctly. Add a test, and fix existing one accordingly

Author: Sean Owen so...@cloudera.com

Closes #5148 from srowen/SPARK-6480 and squashes the following commits:

974a0a0 [Sean Owen] Additional test of huge ranges, and a few more comments 
(and comment fixes)
23ec01e [Sean Owen] Fix fastBucketFunction for histogram() to handle edge 
conditions more correctly. Add a test, and fix existing one accordingly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe15ea97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe15ea97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe15ea97

Branch: refs/heads/master
Commit: fe15ea976073edd738c006af1eb8d31617a039fc
Parents: 3ddb975
Author: Sean Owen so...@cloudera.com
Authored: Thu Mar 26 15:00:23 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Thu Mar 26 15:00:23 2015 +

--
 .../apache/spark/rdd/DoubleRDDFunctions.scala   | 20 
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 24 
 2 files changed, 29 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe15ea97/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 03afc28..71e6e30 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends 
Logging with Serializable {
   }
 }
 // Determine the bucket function in constant time. Requires that buckets 
are evenly spaced
-def fastBucketFunction(min: Double, increment: Double, count: Int)(e: 
Double): Option[Int] = {
+def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): 
Option[Int] = {
   // If our input is not a number unless the increment is also NaN then we 
fail fast
-  if (e.isNaN()) {
-return None
-  }
-  val bucketNumber = (e - min)/(increment)
-  // We do this rather than buckets.lengthCompare(bucketNumber)
-  // because Array[Double] fails to override it (for now).
-  if (bucketNumber  count || bucketNumber  0) {
+  if (e.isNaN || e  min || e  max) {
 None
   } else {
-Some(bucketNumber.toInt.min(count - 1))
+// Compute ratio of e's distance along range to total range first, for 
better precision
+val bucketNumber = (((e - min) / (max - min)) * count).toInt
+// should be less than count, but will equal count if e == max, in 
which case
+// it's part of the last end-range-inclusive bucket, so return count-1
+Some(math.min(bucketNumber, count - 1))
   }
 }
 // Decide which bucket function to pass to histogramPartition. We decide 
here
-// rather than having a general function so that the decission need only 
be made
+// rather than having a general function so that the decision need only be 
made
 // once rather than once per shard
 val bucketFunction = if (evenBuckets) {
-  fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
+  fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _
 } else {
   basicBucketFunction _
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fe15ea97/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index 4cd0f97..9707938 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -235,6 +235,12 @@ class DoubleRDDSuite extends FunSuite with 
SharedSparkContext {
 assert(histogramBuckets === expectedHistogramBuckets)
   }
 
+  test(WorksWithDoubleValuesAtMinMax) {
+val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3))
+assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2)
+assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2)
+  }
+
   test(WorksWithoutBucketsWithMoreRequestedThanElements) {
 // Verify the basic case of one bucket and all elements in that bucket 
works
 val rdd = sc.parallelize(Seq(1, 2))
@@ -248,7 +254,7 @@ class DoubleRDDSuite 

spark git commit: SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases

2015-03-26 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 61c059a4a - 758ebf77d


SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases

Fix fastBucketFunction for histogram() to handle edge conditions more 
correctly. Add a test, and fix existing one accordingly

Author: Sean Owen so...@cloudera.com

Closes #5148 from srowen/SPARK-6480 and squashes the following commits:

974a0a0 [Sean Owen] Additional test of huge ranges, and a few more comments 
(and comment fixes)
23ec01e [Sean Owen] Fix fastBucketFunction for histogram() to handle edge 
conditions more correctly. Add a test, and fix existing one accordingly

(cherry picked from commit fe15ea976073edd738c006af1eb8d31617a039fc)
Signed-off-by: Sean Owen so...@cloudera.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/758ebf77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/758ebf77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/758ebf77

Branch: refs/heads/branch-1.2
Commit: 758ebf77d7daded7c5f6f41ee269205bc246d487
Parents: 61c059a
Author: Sean Owen so...@cloudera.com
Authored: Thu Mar 26 15:00:23 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Thu Mar 26 15:00:42 2015 +

--
 .../apache/spark/rdd/DoubleRDDFunctions.scala   | 20 
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 24 
 2 files changed, 29 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/758ebf77/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index e0494ee..e66c06e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -192,25 +192,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends 
Logging with Serializable {
   }
 }
 // Determine the bucket function in constant time. Requires that buckets 
are evenly spaced
-def fastBucketFunction(min: Double, increment: Double, count: Int)(e: 
Double): Option[Int] = {
+def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): 
Option[Int] = {
   // If our input is not a number unless the increment is also NaN then we 
fail fast
-  if (e.isNaN()) {
-return None
-  }
-  val bucketNumber = (e - min)/(increment)
-  // We do this rather than buckets.lengthCompare(bucketNumber)
-  // because Array[Double] fails to override it (for now).
-  if (bucketNumber  count || bucketNumber  0) {
+  if (e.isNaN || e  min || e  max) {
 None
   } else {
-Some(bucketNumber.toInt.min(count - 1))
+// Compute ratio of e's distance along range to total range first, for 
better precision
+val bucketNumber = (((e - min) / (max - min)) * count).toInt
+// should be less than count, but will equal count if e == max, in 
which case
+// it's part of the last end-range-inclusive bucket, so return count-1
+Some(math.min(bucketNumber, count - 1))
   }
 }
 // Decide which bucket function to pass to histogramPartition. We decide 
here
-// rather than having a general function so that the decission need only 
be made
+// rather than having a general function so that the decision need only be 
made
 // once rather than once per shard
 val bucketFunction = if (evenBuckets) {
-  fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
+  fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _
 } else {
   basicBucketFunction _
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/758ebf77/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index f89bdb6..e29ac0c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -233,6 +233,12 @@ class DoubleRDDSuite extends FunSuite with 
SharedSparkContext {
 assert(histogramBuckets === expectedHistogramBuckets)
   }
 
+  test(WorksWithDoubleValuesAtMinMax) {
+val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3))
+assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2)
+assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2)
+  }
+
   test(WorksWithoutBucketsWithMoreRequestedThanElements) {
 // Verify the basic case of one bucket