spark git commit: [SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo
Repository: spark Updated Branches: refs/heads/branch-1.3 0ba759985 - 825499655 [SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo Author: Michael Armbrust mich...@databricks.com Closes #5191 from marmbrus/kryoRowsWithSchema and squashes the following commits: bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema (cherry picked from commit f88f51bbd461e0a42ad7021147268509b9c3c56e) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82549965 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82549965 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82549965 Branch: refs/heads/branch-1.3 Commit: 8254996557512b8bbc8fd35c550004b56144581f Parents: 0ba7599 Author: Michael Armbrust mich...@databricks.com Authored: Thu Mar 26 18:46:57 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Thu Mar 26 18:47:15 2015 +0800 -- .../spark/sql/catalyst/expressions/rows.scala | 7 +-- .../org/apache/spark/sql/types/Metadata.scala | 3 +++ .../org/apache/spark/sql/types/dataTypes.scala| 18 ++ .../spark/sql/execution/SparkSqlSerializer.scala | 4 +--- .../scala/org/apache/spark/sql/RowSuite.scala | 12 5 files changed, 39 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 8bba26b..a8983df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -66,7 +66,7 @@ object EmptyRow extends Row { */ class GenericRow(protected[sql] val values: Array[Any]) extends Row { /** No-arg constructor for serialization. */ - def this() = this(null) + protected def this() = this(null) def this(size: Int) = this(new Array[Any](size)) @@ -172,11 +172,14 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { class GenericRowWithSchema(values: Array[Any], override val schema: StructType) extends GenericRow(values) { + + /** No-arg constructor for serialization. */ + protected def this() = this(null, null) } class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow { /** No-arg constructor for serialization. */ - def this() = this(null) + protected def this() = this(null) def this(size: Int) = this(new Array[Any](size)) http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index e50e976..6ee24ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -41,6 +41,9 @@ import org.apache.spark.annotation.DeveloperApi sealed class Metadata private[types] (private[types] val map: Map[String, Any]) extends Serializable { + /** No-arg constructor for kryo. */ + protected def this() = this(null) + /** Tests whether this Metadata contains a binding for a key. */ def contains(key: String): Boolean = map.contains(key) http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index d973144..952cf5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -670,6 +670,10 @@ case class PrecisionInfo(precision: Int, scale: Int) */ @DeveloperApi case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType { + + /** No-arg constructor for kryo. */ + protected def this() = this(null) + private[sql] type JvmType = Decimal @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val numeric = Decimal.DecimalIsFractional @@ -819,6 +823,10
spark git commit: [MLlib]remove unused import
Repository: spark Updated Branches: refs/heads/master 1c05027a1 - 3ddb975fa [MLlib]remove unused import minor thing. Let me know if jira is required. Author: Yuhao Yang hhb...@gmail.com Closes #5207 from hhbyyh/adjustImport and squashes the following commits: 2240121 [Yuhao Yang] remove unused import Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ddb975f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ddb975f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ddb975f Branch: refs/heads/master Commit: 3ddb975faeddeb2674a7e7f7e80cf90dfbd4d6d2 Parents: 1c05027 Author: Yuhao Yang hhb...@gmail.com Authored: Thu Mar 26 13:27:05 2015 + Committer: Sean Owen so...@cloudera.com Committed: Thu Mar 26 13:27:05 2015 + -- mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ddb975f/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 5e17c8d..9d63a08 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector = BDV, normalize, axpy = brzAxpy} +import breeze.linalg.{DenseVector = BDV, normalize} import org.apache.spark.Logging import org.apache.spark.annotation.Experimental - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6491] Spark will put the current working dir to the CLASSPATH
Repository: spark Updated Branches: refs/heads/branch-1.3 836c92165 - 5b5f0e2b0 [SPARK-6491] Spark will put the current working dir to the CLASSPATH When running bin/computer-classpath.sh, the output will be: :/spark/conf:/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.5.0-cdh5.2.0.jar:/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/lib_managed/jars/datanucleus-core-3.2.10.jar Java will add the current working dir to the CLASSPATH, if the first : exists, which is not expected by spark users. For example, if I call spark-shell in the folder /root. And there exists a core-site.xml under /root/. Spark will use this file as HADOOP CONF file, even if I have already set HADOOP_CONF_DIR=/etc/hadoop/conf. Author: guliangliang guliangli...@qiyi.com Closes #5156 from marsishandsome/Spark6491 and squashes the following commits: 5ae214f [guliangliang] use appendToClasspath to change CLASSPATH b21f3b2 [guliangliang] keep the classpath order 5d1f870 [guliangliang] [SPARK-6491] Spark will put the current working dir to the CLASSPATH Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5f0e2b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5f0e2b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5f0e2b Branch: refs/heads/branch-1.3 Commit: 5b5f0e2b08941bde2655b1aec9b2ae28c377be78 Parents: 836c921 Author: guliangliang guliangli...@qiyi.com Authored: Thu Mar 26 13:28:56 2015 + Committer: Sean Owen so...@cloudera.com Committed: Thu Mar 26 13:28:56 2015 + -- bin/compute-classpath.sh | 81 ++- 1 file changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b5f0e2b/bin/compute-classpath.sh -- diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index f28..679dfaf 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -25,17 +25,24 @@ FWDIR=$(cd `dirname $0`/..; pwd) . $FWDIR/bin/load-spark-env.sh -if [ -n $SPARK_CLASSPATH ]; then - CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH -else - CLASSPATH=$SPARK_SUBMIT_CLASSPATH -fi +function appendToClasspath(){ + if [ -n $1 ]; then +if [ -n $CLASSPATH ]; then + CLASSPATH=$CLASSPATH:$1 +else + CLASSPATH=$1 +fi + fi +} + +appendToClasspath $SPARK_CLASSPATH +appendToClasspath $SPARK_SUBMIT_CLASSPATH # Build up classpath if [ -n $SPARK_CONF_DIR ]; then - CLASSPATH=$CLASSPATH:$SPARK_CONF_DIR + appendToClasspath $SPARK_CONF_DIR else - CLASSPATH=$CLASSPATH:$FWDIR/conf + appendToClasspath $FWDIR/conf fi ASSEMBLY_DIR=$FWDIR/assembly/target/scala-$SPARK_SCALA_VERSION @@ -51,20 +58,20 @@ if [ -n $SPARK_PREPEND_CLASSES ]; then echo NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark\ classes ahead of assembly. 2 # Spark classes - CLASSPATH=$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes - CLASSPATH=$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath $FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes + appendToClasspath
spark git commit: [SPARK-6468][Block Manager] Fix the race condition of subDirs in DiskBlockManager
Repository: spark Updated Branches: refs/heads/master f88f51bbd - 0c88ce541 [SPARK-6468][Block Manager] Fix the race condition of subDirs in DiskBlockManager There are two race conditions of `subDirs` in `DiskBlockManager`: 1. `getAllFiles` does not use correct locks to read the contents in `subDirs`. Although it's designed for testing, it's still worth to add correct locks to eliminate the race condition. 2. The double-check has a race condition in `getFile(filename: String)`. If a thread finds `subDirs(dirId)(subDirId)` is not null out of the `synchronized` block, it may not be able to see the correct content of the File instance pointed by `subDirs(dirId)(subDirId)` according to the Java memory model (there is no volatile variable here). This PR fixed the above race conditions. Author: zsxwing zsxw...@gmail.com Closes #5136 from zsxwing/SPARK-6468 and squashes the following commits: cbb872b [zsxwing] Fix the race condition of subDirs in DiskBlockManager Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88ce54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88ce54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88ce54 Branch: refs/heads/master Commit: 0c88ce5416d7687bc806a7655e17009ad5823d30 Parents: f88f51b Author: zsxwing zsxw...@gmail.com Authored: Thu Mar 26 12:54:48 2015 + Committer: Sean Owen so...@cloudera.com Committed: Thu Mar 26 12:54:48 2015 + -- .../apache/spark/storage/DiskBlockManager.scala | 32 +++- 1 file changed, 18 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c88ce54/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 12cd8ea..2883137 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon logError(Failed to create any local dir.) System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content + // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private val shutdownHook = addShutdownHook() @@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesn't already exist -var subDir = subDirs(dirId)(subDirId) -if (subDir == null) { - subDir = subDirs(dirId).synchronized { -val old = subDirs(dirId)(subDirId) -if (old != null) { - old -} else { - val newDir = new File(localDirs(dirId), %02x.format(subDirId)) - if (!newDir.exists() !newDir.mkdir()) { -throw new IOException(sFailed to create local dir in $newDir.) - } - subDirs(dirId)(subDirId) = newDir - newDir +val subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { +old + } else { +val newDir = new File(localDirs(dirId), %02x.format(subDirId)) +if (!newDir.exists() !newDir.mkdir()) { + throw new IOException(sFailed to create local dir in $newDir.) } +subDirs(dirId)(subDirId) = newDir +newDir } } @@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon /** List all the files currently stored on disk by the disk manager. */ def getAllFiles(): Seq[File] = { // Get all the files inside the array of array of directories -subDirs.flatten.filter(_ != null).flatMap { dir = +subDirs.flatMap { dir = + dir.synchronized { +// Copy the content of dir because it may be modified in other threads +dir.clone() + } +}.filter(_ != null).flatMap { dir = val files = dir.listFiles() if (files != null) files else Seq.empty } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns
Repository: spark Updated Branches: refs/heads/master 0c88ce541 - 1c05027a1 [SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns Currently in the parquet relation 2 implementation, error is thrown in case merged schema is not exactly the same as metastore schema. But to support cases like deletion of column using replace column command, we can relax the restriction so that even if metastore schema is a subset of merged parquet schema, the query will work. Author: Yash Datta yash.da...@guavus.com Closes #5141 from saucam/replace_col and squashes the following commits: e858d5b [Yash Datta] SPARK-6471: Fix test cases, add a new test case for metastore schema to be subset of parquet schema 5f2f467 [Yash Datta] SPARK-6471: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c05027a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c05027a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c05027a Branch: refs/heads/master Commit: 1c05027a143d1b0bf3df192984e6cac752b1e926 Parents: 0c88ce5 Author: Yash Datta yash.da...@guavus.com Authored: Thu Mar 26 21:13:38 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Thu Mar 26 21:13:38 2015 +0800 -- .../org/apache/spark/sql/parquet/newParquet.scala | 5 +++-- .../spark/sql/parquet/ParquetSchemaSuite.scala| 18 -- 2 files changed, 19 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c05027a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 410600b..3516cfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -758,12 +758,13 @@ private[sql] object ParquetRelation2 extends Logging { |${parquetSchema.prettyJson} .stripMargin -assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage) +assert(metastoreSchema.size = parquetSchema.size, schemaConflictMessage) val ordinalMap = metastoreSchema.zipWithIndex.map { case (field, index) = field.name.toLowerCase - index }.toMap -val reorderedParquetSchema = parquetSchema.sortBy(f = ordinalMap(f.name.toLowerCase)) +val reorderedParquetSchema = parquetSchema.sortBy(f = + ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) StructType(metastoreSchema.zip(reorderedParquetSchema).map { // Uses Parquet field names but retains Metastore data types. http://git-wip-us.apache.org/repos/asf/spark/blob/1c05027a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index 321832c..8462f9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -212,8 +212,11 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField(UPPERCase, IntegerType, nullable = true } -// Conflicting field count -assert(intercept[Throwable] { +// MetaStore schema is subset of parquet schema +assertResult( + StructType(Seq( +StructField(UPPERCase, DoubleType, nullable = false { + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField(uppercase, DoubleType, nullable = false))), @@ -221,6 +224,17 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructType(Seq( StructField(lowerCase, BinaryType), StructField(UPPERCase, IntegerType, nullable = true +} + +// Conflicting field count +assert(intercept[Throwable] { + ParquetRelation2.mergeMetastoreParquetSchema( +StructType(Seq( + StructField(uppercase, DoubleType, nullable = false), + StructField(lowerCase, BinaryType))), + +StructType(Seq( + StructField(UPPERCase, IntegerType, nullable = true }.getMessage.contains(detected conflicting schemas)) // Conflicting field names - To unsubscribe,
spark git commit: [SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns
Repository: spark Updated Branches: refs/heads/branch-1.3 825499655 - 836c92165 [SQL][SPARK-6471]: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns Currently in the parquet relation 2 implementation, error is thrown in case merged schema is not exactly the same as metastore schema. But to support cases like deletion of column using replace column command, we can relax the restriction so that even if metastore schema is a subset of merged parquet schema, the query will work. Author: Yash Datta yash.da...@guavus.com Closes #5141 from saucam/replace_col and squashes the following commits: e858d5b [Yash Datta] SPARK-6471: Fix test cases, add a new test case for metastore schema to be subset of parquet schema 5f2f467 [Yash Datta] SPARK-6471: Metastore schema should only be a subset of parquet schema to support dropping of columns using replace columns (cherry picked from commit 1c05027a143d1b0bf3df192984e6cac752b1e926) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/836c9216 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/836c9216 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/836c9216 Branch: refs/heads/branch-1.3 Commit: 836c9216599b676ae8f421384f4f20fd35e8c53b Parents: 8254996 Author: Yash Datta yash.da...@guavus.com Authored: Thu Mar 26 21:13:38 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Thu Mar 26 21:14:15 2015 +0800 -- .../org/apache/spark/sql/parquet/newParquet.scala | 5 +++-- .../spark/sql/parquet/ParquetSchemaSuite.scala| 18 -- 2 files changed, 19 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/836c9216/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 410600b..3516cfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -758,12 +758,13 @@ private[sql] object ParquetRelation2 extends Logging { |${parquetSchema.prettyJson} .stripMargin -assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage) +assert(metastoreSchema.size = parquetSchema.size, schemaConflictMessage) val ordinalMap = metastoreSchema.zipWithIndex.map { case (field, index) = field.name.toLowerCase - index }.toMap -val reorderedParquetSchema = parquetSchema.sortBy(f = ordinalMap(f.name.toLowerCase)) +val reorderedParquetSchema = parquetSchema.sortBy(f = + ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) StructType(metastoreSchema.zip(reorderedParquetSchema).map { // Uses Parquet field names but retains Metastore data types. http://git-wip-us.apache.org/repos/asf/spark/blob/836c9216/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index 321832c..8462f9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -212,8 +212,11 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField(UPPERCase, IntegerType, nullable = true } -// Conflicting field count -assert(intercept[Throwable] { +// MetaStore schema is subset of parquet schema +assertResult( + StructType(Seq( +StructField(UPPERCase, DoubleType, nullable = false { + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField(uppercase, DoubleType, nullable = false))), @@ -221,6 +224,17 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructType(Seq( StructField(lowerCase, BinaryType), StructField(UPPERCase, IntegerType, nullable = true +} + +// Conflicting field count +assert(intercept[Throwable] { + ParquetRelation2.mergeMetastoreParquetSchema( +StructType(Seq( + StructField(uppercase, DoubleType, nullable = false), + StructField(lowerCase, BinaryType))), + +StructType(Seq( + StructField(UPPERCase, IntegerType, nullable = true }.getMessage.contains(detected conflicting schemas))
spark git commit: [SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB.
Repository: spark Updated Branches: refs/heads/master 39fb57968 - 49d2ec63e [SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB. Kryo buffers are backed by byte arrays, but primitive arrays can only be up to 2GB in size. It is misleading to allow users to set buffers past this size. Author: mcheah mch...@palantir.com Closes #5218 from mccheah/feature/limit-kryo-buffer and squashes the following commits: 1d6d1be [mcheah] Fixing numeric typo e2e30ce [mcheah] Removing explicit int and double type to match style 09fd80b [mcheah] Should be = not . Slightly more consistent error message. 60634f9 [mcheah] [SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49d2ec63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49d2ec63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49d2ec63 Branch: refs/heads/master Commit: 49d2ec63eccec8a3a78b15b583c36f84310fc6f0 Parents: 39fb579 Author: mcheah mch...@palantir.com Authored: Thu Mar 26 22:48:42 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Thu Mar 26 22:48:42 2015 -0700 -- .../apache/spark/serializer/KryoSerializer.scala| 16 +--- 1 file changed, 13 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49d2ec63/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index f83bcaa..579fb66 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -49,10 +49,20 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSize = -(conf.getDouble(spark.kryoserializer.buffer.mb, 0.064) * 1024 * 1024).toInt + private val bufferSizeMb = conf.getDouble(spark.kryoserializer.buffer.mb, 0.064) + if (bufferSizeMb = 2048) { +throw new IllegalArgumentException(spark.kryoserializer.buffer.mb must be less than + + s2048 mb, got: + $bufferSizeMb mb.) + } + private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt + + val maxBufferSizeMb = conf.getInt(spark.kryoserializer.buffer.max.mb, 64) + if (maxBufferSizeMb = 2048) { +throw new IllegalArgumentException(spark.kryoserializer.buffer.max.mb must be less than + + s2048 mb, got: + $maxBufferSizeMb mb.) + } + private val maxBufferSize = maxBufferSizeMb * 1024 * 1024 - private val maxBufferSize = conf.getInt(spark.kryoserializer.buffer.max.mb, 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean(spark.kryo.referenceTracking, true) private val registrationRequired = conf.getBoolean(spark.kryo.registrationRequired, false) private val userRegistrator = conf.getOption(spark.kryo.registrator) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-6532 [BUILD] LDAModel.scala fails scalastyle on Windows
Repository: spark Updated Branches: refs/heads/master fe15ea976 - c3a52a082 SPARK-6532 [BUILD] LDAModel.scala fails scalastyle on Windows Use standard UTF-8 source / report encoding for scalastyle Author: Sean Owen so...@cloudera.com Closes #5211 from srowen/SPARK-6532 and squashes the following commits: 16a33e5 [Sean Owen] Use standard UTF-8 source / report encoding for scalastyle Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3a52a08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3a52a08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3a52a08 Branch: refs/heads/master Commit: c3a52a08248db08eade29b265f02483144a282d6 Parents: fe15ea9 Author: Sean Owen so...@cloudera.com Authored: Thu Mar 26 10:52:31 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 10:52:31 2015 -0700 -- pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3a52a08/pom.xml -- diff --git a/pom.xml b/pom.xml index 23bb161..b3cecd1 100644 --- a/pom.xml +++ b/pom.xml @@ -1452,7 +1452,8 @@ testSourceDirectory${basedir}/src/test/scala/testSourceDirectory configLocationscalastyle-config.xml/configLocation outputFilescalastyle-output.xml/outputFile - outputEncodingUTF-8/outputEncoding + inputEncoding${project.build.sourceEncoding}/inputEncoding + outputEncoding${project.reporting.outputEncoding}/outputEncoding /configuration executions execution - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6554] [SQL] Don't push down predicates which reference partition column(s)
Repository: spark Updated Branches: refs/heads/master 784fcd532 - 71a0d40eb [SPARK-6554] [SQL] Don't push down predicates which reference partition column(s) There are two cases for the new Parquet data source: 1. Partition columns exist in the Parquet data files We don't need to push-down these predicates since partition pruning already handles them. 1. Partition columns don't exist in the Parquet data files We can't push-down these predicates since they are considered as invalid columns by Parquet. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/5210) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com Closes #5210 from liancheng/spark-6554 and squashes the following commits: 4f7ec03 [Cheng Lian] Adds comments e134ced [Cheng Lian] Don't push down predicates which reference partition column(s) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71a0d40e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71a0d40e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71a0d40e Branch: refs/heads/master Commit: 71a0d40ebd37c80d8020e184366778b57c762285 Parents: 784fcd5 Author: Cheng Lian l...@databricks.com Authored: Thu Mar 26 13:11:37 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Mar 26 13:11:37 2015 -0700 -- .../org/apache/spark/sql/parquet/newParquet.scala | 17 - .../spark/sql/parquet/ParquetFilterSuite.scala | 17 + 2 files changed, 29 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71a0d40e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 3516cfe..0d68810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -435,11 +435,18 @@ private[sql] case class ParquetRelation2( // Push down filters when possible. Notice that not all filters can be converted to Parquet // filter predicate. Here we try to convert each individual predicate and only collect those // convertible ones. -predicates - .flatMap(ParquetFilters.createFilter) - .reduceOption(FilterApi.and) - .filter(_ = sqlContext.conf.parquetFilterPushDown) - .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _)) +if (sqlContext.conf.parquetFilterPushDown) { + predicates +// Don't push down predicates which reference partition columns +.filter { pred = + val partitionColNames = partitionColumns.map(_.name).toSet + val referencedColNames = pred.references.map(_.name).toSet + referencedColNames.intersect(partitionColNames).isEmpty +} +.flatMap(ParquetFilters.createFilter) +.reduceOption(FilterApi.and) +.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _)) +} if (isPartitioned) { logInfo { http://git-wip-us.apache.org/repos/asf/spark/blob/71a0d40e/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 4d32e84..6a2c2a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -321,6 +321,23 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA override protected def afterAll(): Unit = { sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test(SPARK-6554: don't push down predicates which reference partition columns) { +import sqlContext.implicits._ + +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED - true) { + withTempPath { dir = +val path = s${dir.getCanonicalPath}/part=1 +(1 to 3).map(i = (i, i.toString)).toDF(a, b).saveAsParquetFile(path) + +// If the part = 1 filter gets pushed down, this query will throw an exception since +// part is not a valid column in the actual Parquet file +checkAnswer( + sqlContext.parquetFile(path).filter(part = 1), + (1 to 3).map(i = Row(i, i.toString, 1))) + } +} + } } class ParquetDataSourceOffFilterSuite extends
spark git commit: [SPARK-6117] [SQL] Improvements to DataFrame.describe()
Repository: spark Updated Branches: refs/heads/branch-1.3 84735c363 - 28e3a1e34 [SPARK-6117] [SQL] Improvements to DataFrame.describe() 1. Slightly modifications to the code to make it more readable. 2. Added Python implementation. 3. Updated the documentation to state that we don't guarantee the output schema for this function and it should only be used for exploratory data analysis. Author: Reynold Xin r...@databricks.com Closes #5201 from rxin/df-describe and squashes the following commits: 25a7834 [Reynold Xin] Reset run-tests. 6abdfee [Reynold Xin] [SPARK-6117] [SQL] Improvements to DataFrame.describe() (cherry picked from commit 784fcd532784fcfd9bf0a1db71c9f71c469ee716) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28e3a1e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28e3a1e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28e3a1e3 Branch: refs/heads/branch-1.3 Commit: 28e3a1e34db2f6e16891d6ae154db10c8170e5e4 Parents: 84735c3 Author: Reynold Xin r...@databricks.com Authored: Thu Mar 26 12:26:13 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 12:26:26 2015 -0700 -- python/pyspark/sql/dataframe.py | 19 .../scala/org/apache/spark/sql/DataFrame.scala | 46 .../org/apache/spark/sql/DataFrameSuite.scala | 3 +- 3 files changed, 48 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28e3a1e3/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bf7c47b..d51309f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -520,6 +520,25 @@ class DataFrame(object): orderBy = sort +def describe(self, *cols): +Computes statistics for numeric columns. + +This include count, mean, stddev, min, and max. If no columns are +given, this function computes statistics for all numerical columns. + + df.describe().show() +summary age +count 2 +mean3.5 +stddev 1.5 +min 2 +max 5 + +cols = ListConverter().convert(cols, + self.sql_ctx._sc._gateway._gateway_client) +jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)) +return DataFrame(jdf, self.sql_ctx) + def head(self, n=None): Return the first `n` rows or the first row if n is None. http://git-wip-us.apache.org/repos/asf/spark/blob/28e3a1e3/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index db56182..4c80359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.sql.catalyst.{expressions, ScalaReflection, SqlParser} +import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser} import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, ResolvedStar} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.types.{NumericType, StructType, StructField, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} import org.apache.spark.util.Utils @@ -752,15 +752,17 @@ class DataFrame private[sql]( } /** - * Compute numerical statistics for given columns of this [[DataFrame]]: - * count, mean (avg), stddev (standard deviation), min, max. - * Each row of the resulting [[DataFrame]] contains column with statistic name - * and columns with statistic results for each given column. - * If no columns are given then computes for all numerical columns. + * Computes statistics for numeric columns, including count, mean, stddev, min, and max. + * If no columns are given, this function computes statistics for all numerical columns. + * + * This function is meant for
spark git commit: [SPARK-6117] [SQL] Improvements to DataFrame.describe()
Repository: spark Updated Branches: refs/heads/master c3a52a082 - 784fcd532 [SPARK-6117] [SQL] Improvements to DataFrame.describe() 1. Slightly modifications to the code to make it more readable. 2. Added Python implementation. 3. Updated the documentation to state that we don't guarantee the output schema for this function and it should only be used for exploratory data analysis. Author: Reynold Xin r...@databricks.com Closes #5201 from rxin/df-describe and squashes the following commits: 25a7834 [Reynold Xin] Reset run-tests. 6abdfee [Reynold Xin] [SPARK-6117] [SQL] Improvements to DataFrame.describe() Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/784fcd53 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/784fcd53 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/784fcd53 Branch: refs/heads/master Commit: 784fcd532784fcfd9bf0a1db71c9f71c469ee716 Parents: c3a52a0 Author: Reynold Xin r...@databricks.com Authored: Thu Mar 26 12:26:13 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 12:26:13 2015 -0700 -- python/pyspark/sql/dataframe.py | 19 .../scala/org/apache/spark/sql/DataFrame.scala | 46 .../org/apache/spark/sql/DataFrameSuite.scala | 3 +- 3 files changed, 48 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/784fcd53/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bf7c47b..d51309f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -520,6 +520,25 @@ class DataFrame(object): orderBy = sort +def describe(self, *cols): +Computes statistics for numeric columns. + +This include count, mean, stddev, min, and max. If no columns are +given, this function computes statistics for all numerical columns. + + df.describe().show() +summary age +count 2 +mean3.5 +stddev 1.5 +min 2 +max 5 + +cols = ListConverter().convert(cols, + self.sql_ctx._sc._gateway._gateway_client) +jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)) +return DataFrame(jdf, self.sql_ctx) + def head(self, n=None): Return the first `n` rows or the first row if n is None. http://git-wip-us.apache.org/repos/asf/spark/blob/784fcd53/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index db56182..4c80359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.sql.catalyst.{expressions, ScalaReflection, SqlParser} +import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser} import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, ResolvedStar} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.types.{NumericType, StructType, StructField, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} import org.apache.spark.util.Utils @@ -752,15 +752,17 @@ class DataFrame private[sql]( } /** - * Compute numerical statistics for given columns of this [[DataFrame]]: - * count, mean (avg), stddev (standard deviation), min, max. - * Each row of the resulting [[DataFrame]] contains column with statistic name - * and columns with statistic results for each given column. - * If no columns are given then computes for all numerical columns. + * Computes statistics for numeric columns, including count, mean, stddev, min, and max. + * If no columns are given, this function computes statistics for all numerical columns. + * + * This function is meant for exploratory data analysis, as we make no guarantee about the + * backward compatibility of the schema of the resulting
spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...
Repository: spark Updated Branches: refs/heads/branch-1.3 aa2d157c6 - 84735c363 [SPARK-6117] [SQL] add describe function to DataFrame for summary statis... Please review my solution for SPARK-6117 Author: azagrebin azagre...@gmail.com Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits: f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it locally into resulting DF, colocate test data with test case ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF without numeric columns 9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for summary statistics (cherry picked from commit 5bbcd1304cfebba31ec6857a80d3825a40d02e83) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84735c36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84735c36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84735c36 Branch: refs/heads/branch-1.3 Commit: 84735c363e220baf2cc39dcef5f040812e23c086 Parents: aa2d157 Author: azagrebin azagre...@gmail.com Authored: Thu Mar 26 00:25:04 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 12:25:48 2015 -0700 -- .../scala/org/apache/spark/sql/DataFrame.scala | 53 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 45 + 2 files changed, 97 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5aece16..db56182 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.types.{NumericType, StructType} +import org.apache.spark.sql.types.{NumericType, StructType, StructField, StringType} import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} import org.apache.spark.util.Utils @@ -752,6 +752,57 @@ class DataFrame private[sql]( } /** + * Compute numerical statistics for given columns of this [[DataFrame]]: + * count, mean (avg), stddev (standard deviation), min, max. + * Each row of the resulting [[DataFrame]] contains column with statistic name + * and columns with statistic results for each given column. + * If no columns are given then computes for all numerical columns. + * + * {{{ + * df.describe(age, height) + * + * // summary age height + * // count 10.0 10.0 + * // mean53.3 178.05 + * // stddev 11.6 15.7 + * // min 18.0 163.0 + * // max 92.0 192.0 + * }}} + */ + @scala.annotation.varargs + def describe(cols: String*): DataFrame = { + +def stddevExpr(expr: Expression) = + Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), Average(expr + +val statistics = List[(String, Expression = Expression)]( + count - Count, + mean - Average, + stddev - stddevExpr, + min - Min, + max - Max) + +val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList + +val localAgg = if (aggCols.nonEmpty) { + val aggExprs = statistics.flatMap { case (_, colToAgg) = +aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c)) + } + + agg(aggExprs.head, aggExprs.tail: _*).head().toSeq +.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, (statistic, _)) = +Row(statistic :: aggregation.toList: _*) + } +} else { + statistics.map { case (name, _) = Row(name) } +} + +val schema = StructType((summary :: aggCols).map(StructField(_, StringType))) +val rowRdd = sqlContext.sparkContext.parallelize(localAgg) +sqlContext.createDataFrame(rowRdd, schema) + } + + /** * Returns the first `n` rows. * @group action */ http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c30ed69..afbedd1 100644 ---
spark git commit: [DOCS][SQL] Fix JDBC example
Repository: spark Updated Branches: refs/heads/branch-1.3 3d545782e - 54d92b542 [DOCS][SQL] Fix JDBC example Author: Michael Armbrust mich...@databricks.com Closes #5192 from marmbrus/fixJDBCDocs and squashes the following commits: b48a33d [Michael Armbrust] [DOCS][SQL] Fix JDBC example (cherry picked from commit aad00322765d6041e817a6bd3fcff2187d212057) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d92b54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d92b54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d92b54 Branch: refs/heads/branch-1.3 Commit: 54d92b5424705215b177c02665c163907c9c7de4 Parents: 3d54578 Author: Michael Armbrust mich...@databricks.com Authored: Thu Mar 26 14:51:46 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Mar 26 14:51:58 2015 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54d92b54/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c99a0b0..4441d6a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1406,7 +1406,7 @@ DataFrame jdbcDF = sqlContext.load(jdbc, options) {% highlight python %} -df = sqlContext.load(jdbc, url=jdbc:postgresql:dbserver, dbtable=schema.tablename) +df = sqlContext.load(source=jdbc, url=jdbc:postgresql:dbserver, dbtable=schema.tablename) {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCS][SQL] Fix JDBC example
Repository: spark Updated Branches: refs/heads/master 71a0d40eb - aad003227 [DOCS][SQL] Fix JDBC example Author: Michael Armbrust mich...@databricks.com Closes #5192 from marmbrus/fixJDBCDocs and squashes the following commits: b48a33d [Michael Armbrust] [DOCS][SQL] Fix JDBC example Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aad00322 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aad00322 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aad00322 Branch: refs/heads/master Commit: aad00322765d6041e817a6bd3fcff2187d212057 Parents: 71a0d40 Author: Michael Armbrust mich...@databricks.com Authored: Thu Mar 26 14:51:46 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Mar 26 14:51:46 2015 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aad00322/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c99a0b0..4441d6a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1406,7 +1406,7 @@ DataFrame jdbcDF = sqlContext.load(jdbc, options) {% highlight python %} -df = sqlContext.load(jdbc, url=jdbc:postgresql:dbserver, dbtable=schema.tablename) +df = sqlContext.load(source=jdbc, url=jdbc:postgresql:dbserver, dbtable=schema.tablename) {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference
Repository: spark Updated Branches: refs/heads/master aad003227 - 39fb57968 [SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference Adds a `Graph#minus` method which will return only unique `VertexId`'s from the calling `VertexRDD`. To demonstrate a basic example with pseudocode: ``` Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2))) Set((0L,0)) ``` Author: Brennon York brennon.y...@capitalone.com Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits: 248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid createUsingIndex and updated the mask operations to simplify with andNot call 3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus method 6575d92 [Brennon York] updated mima exclude aaa030b [Brennon York] completed graph#minus functionality 7227c0f [Brennon York] beginning work on minus functionality Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39fb5796 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39fb5796 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39fb5796 Branch: refs/heads/master Commit: 39fb57968352549f2276ac4fcd2b92988ed6fe42 Parents: aad0032 Author: Brennon York brennon.y...@capitalone.com Authored: Thu Mar 26 19:08:09 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Mar 26 19:08:09 2015 -0700 -- docs/graphx-programming-guide.md| 2 ++ .../org/apache/spark/graphx/VertexRDD.scala | 16 ++ .../graphx/impl/VertexPartitionBaseOps.scala| 15 + .../spark/graphx/impl/VertexRDDImpl.scala | 25 +++ .../apache/spark/graphx/VertexRDDSuite.scala| 33 ++-- project/MimaExcludes.scala | 3 ++ 6 files changed, 92 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/docs/graphx-programming-guide.md -- diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index c601d79..3f10cb2 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -899,6 +899,8 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] { // Transform the values without changing the ids (preserves the internal index) def mapValues[VD2](map: VD = VD2): VertexRDD[VD2] def mapValues[VD2](map: (VertexId, VD) = VD2): VertexRDD[VD2] + // Show only vertices unique to this set based on their VertexId's + def minus(other: RDD[(VertexId, VD)]) // Remove vertices from this set that appear in the other set def diff(other: VertexRDD[VD]): VertexRDD[VD] // Join operators that take advantage of the internal indexing to accelerate joins (substantially) http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index ad4bfe0..a9f04b5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -122,6 +122,22 @@ abstract class VertexRDD[VD]( def mapValues[VD2: ClassTag](f: (VertexId, VD) = VD2): VertexRDD[VD2] /** + * For each VertexId present in both `this` and `other`, minus will act as a set difference + * operation returning only those unique VertexId's present in `this`. + * + * @param other an RDD to run the set operation against + */ + def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD] + + /** + * For each VertexId present in both `this` and `other`, minus will act as a set difference + * operation returning only those unique VertexId's present in `this`. + * + * @param other a VertexRDD to run the set operation against + */ + def minus(other: VertexRDD[VD]): VertexRDD[VD] + + /** * For each vertex present in both `this` and `other`, `diff` returns only those vertices with * differing values; for values that are different, keeps the values from `other`. This is * only guaranteed to work if the VertexRDDs share a common ancestor. http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala index 4fd2548..b90f9fa 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala +++
spark git commit: [SPARK-6536] [PySpark] Column.inSet() in Python
Repository: spark Updated Branches: refs/heads/branch-1.3 9edb34fc3 - 0ba759985 [SPARK-6536] [PySpark] Column.inSet() in Python ``` df[df.name.inSet(Bob, Mike)].collect() [Row(age=5, name=u'Bob')] df[df.age.inSet([1, 2, 3])].collect() [Row(age=2, name=u'Alice')] ``` Author: Davies Liu dav...@databricks.com Closes #5190 from davies/in and squashes the following commits: 6b73a47 [Davies Liu] Column.inSet() in Python (cherry picked from commit f535802977c5a3ce45894d89fdf59f8723f023c8) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ba75998 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ba75998 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ba75998 Branch: refs/heads/branch-1.3 Commit: 0ba759985288f5df6940c37f5f401bc31de53a1c Parents: 9edb34f Author: Davies Liu dav...@databricks.com Authored: Thu Mar 26 00:01:24 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 00:01:32 2015 -0700 -- python/pyspark/sql/dataframe.py | 17 + 1 file changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ba75998/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5cb89da..bf7c47b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -985,6 +985,23 @@ class Column(object): __getslice__ = substr +def inSet(self, *cols): + A boolean expression that is evaluated to true if the value of this +expression is contained by the evaluated values of the arguments. + + df[df.name.inSet(Bob, Mike)].collect() +[Row(age=5, name=u'Bob')] + df[df.age.inSet([1, 2, 3])].collect() +[Row(age=2, name=u'Alice')] + +if len(cols) == 1 and isinstance(cols[0], (list, set)): +cols = cols[0] +cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols] +sc = SparkContext._active_spark_context +jcols = ListConverter().convert(cols, sc._gateway._gateway_client) +jc = getattr(self._jc, in)(sc._jvm.PythonUtils.toSeq(jcols)) +return Column(jc) + # order asc = _unary_op(asc, Returns a sort expression based on the ascending order of the given column name.) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6536] [PySpark] Column.inSet() in Python
Repository: spark Updated Branches: refs/heads/master 276ef1c3c - f53580297 [SPARK-6536] [PySpark] Column.inSet() in Python ``` df[df.name.inSet(Bob, Mike)].collect() [Row(age=5, name=u'Bob')] df[df.age.inSet([1, 2, 3])].collect() [Row(age=2, name=u'Alice')] ``` Author: Davies Liu dav...@databricks.com Closes #5190 from davies/in and squashes the following commits: 6b73a47 [Davies Liu] Column.inSet() in Python Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5358029 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5358029 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5358029 Branch: refs/heads/master Commit: f535802977c5a3ce45894d89fdf59f8723f023c8 Parents: 276ef1c Author: Davies Liu dav...@databricks.com Authored: Thu Mar 26 00:01:24 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 00:01:24 2015 -0700 -- python/pyspark/sql/dataframe.py | 17 + 1 file changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5358029/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5cb89da..bf7c47b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -985,6 +985,23 @@ class Column(object): __getslice__ = substr +def inSet(self, *cols): + A boolean expression that is evaluated to true if the value of this +expression is contained by the evaluated values of the arguments. + + df[df.name.inSet(Bob, Mike)].collect() +[Row(age=5, name=u'Bob')] + df[df.age.inSet([1, 2, 3])].collect() +[Row(age=2, name=u'Alice')] + +if len(cols) == 1 and isinstance(cols[0], (list, set)): +cols = cols[0] +cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols] +sc = SparkContext._active_spark_context +jcols = ListConverter().convert(cols, sc._gateway._gateway_client) +jc = getattr(self._jc, in)(sc._jvm.PythonUtils.toSeq(jcols)) +return Column(jc) + # order asc = _unary_op(asc, Returns a sort expression based on the ascending order of the given column name.) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...
Repository: spark Updated Branches: refs/heads/master f53580297 - 5bbcd1304 [SPARK-6117] [SQL] add describe function to DataFrame for summary statis... Please review my solution for SPARK-6117 Author: azagrebin azagre...@gmail.com Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits: f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it locally into resulting DF, colocate test data with test case ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF without numeric columns 9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for summary statistics Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bbcd130 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bbcd130 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bbcd130 Branch: refs/heads/master Commit: 5bbcd1304cfebba31ec6857a80d3825a40d02e83 Parents: f535802 Author: azagrebin azagre...@gmail.com Authored: Thu Mar 26 00:25:04 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 00:25:04 2015 -0700 -- .../scala/org/apache/spark/sql/DataFrame.scala | 53 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 45 + 2 files changed, 97 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5aece16..db56182 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.types.{NumericType, StructType} +import org.apache.spark.sql.types.{NumericType, StructType, StructField, StringType} import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} import org.apache.spark.util.Utils @@ -752,6 +752,57 @@ class DataFrame private[sql]( } /** + * Compute numerical statistics for given columns of this [[DataFrame]]: + * count, mean (avg), stddev (standard deviation), min, max. + * Each row of the resulting [[DataFrame]] contains column with statistic name + * and columns with statistic results for each given column. + * If no columns are given then computes for all numerical columns. + * + * {{{ + * df.describe(age, height) + * + * // summary age height + * // count 10.0 10.0 + * // mean53.3 178.05 + * // stddev 11.6 15.7 + * // min 18.0 163.0 + * // max 92.0 192.0 + * }}} + */ + @scala.annotation.varargs + def describe(cols: String*): DataFrame = { + +def stddevExpr(expr: Expression) = + Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), Average(expr + +val statistics = List[(String, Expression = Expression)]( + count - Count, + mean - Average, + stddev - stddevExpr, + min - Min, + max - Max) + +val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList + +val localAgg = if (aggCols.nonEmpty) { + val aggExprs = statistics.flatMap { case (_, colToAgg) = +aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c)) + } + + agg(aggExprs.head, aggExprs.tail: _*).head().toSeq +.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, (statistic, _)) = +Row(statistic :: aggregation.toList: _*) + } +} else { + statistics.map { case (name, _) = Row(name) } +} + +val schema = StructType((summary :: aggCols).map(StructField(_, StringType))) +val rowRdd = sqlContext.sparkContext.parallelize(localAgg) +sqlContext.createDataFrame(rowRdd, schema) + } + + /** * Returns the first `n` rows. * @group action */ http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c30ed69..afbedd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -443,6 +443,51 @@ class DataFrameSuite
spark git commit: SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases
Repository: spark Updated Branches: refs/heads/branch-1.3 5b5f0e2b0 - aa2d157c6 SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases Fix fastBucketFunction for histogram() to handle edge conditions more correctly. Add a test, and fix existing one accordingly Author: Sean Owen so...@cloudera.com Closes #5148 from srowen/SPARK-6480 and squashes the following commits: 974a0a0 [Sean Owen] Additional test of huge ranges, and a few more comments (and comment fixes) 23ec01e [Sean Owen] Fix fastBucketFunction for histogram() to handle edge conditions more correctly. Add a test, and fix existing one accordingly (cherry picked from commit fe15ea976073edd738c006af1eb8d31617a039fc) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa2d157c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa2d157c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa2d157c Branch: refs/heads/branch-1.3 Commit: aa2d157c62b8c4d1d5479c92844d9ab1207e4240 Parents: 5b5f0e2 Author: Sean Owen so...@cloudera.com Authored: Thu Mar 26 15:00:23 2015 + Committer: Sean Owen so...@cloudera.com Committed: Thu Mar 26 15:00:32 2015 + -- .../apache/spark/rdd/DoubleRDDFunctions.scala | 20 .../org/apache/spark/rdd/DoubleRDDSuite.scala | 24 2 files changed, 29 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa2d157c/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index e66f83b..a3a03ef 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } } // Determine the bucket function in constant time. Requires that buckets are evenly spaced -def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = { +def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): Option[Int] = { // If our input is not a number unless the increment is also NaN then we fail fast - if (e.isNaN()) { -return None - } - val bucketNumber = (e - min)/(increment) - // We do this rather than buckets.lengthCompare(bucketNumber) - // because Array[Double] fails to override it (for now). - if (bucketNumber count || bucketNumber 0) { + if (e.isNaN || e min || e max) { None } else { -Some(bucketNumber.toInt.min(count - 1)) +// Compute ratio of e's distance along range to total range first, for better precision +val bucketNumber = (((e - min) / (max - min)) * count).toInt +// should be less than count, but will equal count if e == max, in which case +// it's part of the last end-range-inclusive bucket, so return count-1 +Some(math.min(bucketNumber, count - 1)) } } // Decide which bucket function to pass to histogramPartition. We decide here -// rather than having a general function so that the decission need only be made +// rather than having a general function so that the decision need only be made // once rather than once per shard val bucketFunction = if (evenBuckets) { - fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _ + fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _ } else { basicBucketFunction _ } http://git-wip-us.apache.org/repos/asf/spark/blob/aa2d157c/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala index de30653..787b06e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -232,6 +232,12 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { assert(histogramBuckets === expectedHistogramBuckets) } + test(WorksWithDoubleValuesAtMinMax) { +val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3)) +assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2) +assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2) + } + test(WorksWithoutBucketsWithMoreRequestedThanElements) { // Verify the basic case of one bucket
spark git commit: SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases
Repository: spark Updated Branches: refs/heads/master 3ddb975fa - fe15ea976 SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases Fix fastBucketFunction for histogram() to handle edge conditions more correctly. Add a test, and fix existing one accordingly Author: Sean Owen so...@cloudera.com Closes #5148 from srowen/SPARK-6480 and squashes the following commits: 974a0a0 [Sean Owen] Additional test of huge ranges, and a few more comments (and comment fixes) 23ec01e [Sean Owen] Fix fastBucketFunction for histogram() to handle edge conditions more correctly. Add a test, and fix existing one accordingly Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe15ea97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe15ea97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe15ea97 Branch: refs/heads/master Commit: fe15ea976073edd738c006af1eb8d31617a039fc Parents: 3ddb975 Author: Sean Owen so...@cloudera.com Authored: Thu Mar 26 15:00:23 2015 + Committer: Sean Owen so...@cloudera.com Committed: Thu Mar 26 15:00:23 2015 + -- .../apache/spark/rdd/DoubleRDDFunctions.scala | 20 .../org/apache/spark/rdd/DoubleRDDSuite.scala | 24 2 files changed, 29 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe15ea97/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 03afc28..71e6e30 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } } // Determine the bucket function in constant time. Requires that buckets are evenly spaced -def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = { +def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): Option[Int] = { // If our input is not a number unless the increment is also NaN then we fail fast - if (e.isNaN()) { -return None - } - val bucketNumber = (e - min)/(increment) - // We do this rather than buckets.lengthCompare(bucketNumber) - // because Array[Double] fails to override it (for now). - if (bucketNumber count || bucketNumber 0) { + if (e.isNaN || e min || e max) { None } else { -Some(bucketNumber.toInt.min(count - 1)) +// Compute ratio of e's distance along range to total range first, for better precision +val bucketNumber = (((e - min) / (max - min)) * count).toInt +// should be less than count, but will equal count if e == max, in which case +// it's part of the last end-range-inclusive bucket, so return count-1 +Some(math.min(bucketNumber, count - 1)) } } // Decide which bucket function to pass to histogramPartition. We decide here -// rather than having a general function so that the decission need only be made +// rather than having a general function so that the decision need only be made // once rather than once per shard val bucketFunction = if (evenBuckets) { - fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _ + fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _ } else { basicBucketFunction _ } http://git-wip-us.apache.org/repos/asf/spark/blob/fe15ea97/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala index 4cd0f97..9707938 100644 --- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -235,6 +235,12 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { assert(histogramBuckets === expectedHistogramBuckets) } + test(WorksWithDoubleValuesAtMinMax) { +val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3)) +assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2) +assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2) + } + test(WorksWithoutBucketsWithMoreRequestedThanElements) { // Verify the basic case of one bucket and all elements in that bucket works val rdd = sc.parallelize(Seq(1, 2)) @@ -248,7 +254,7 @@ class DoubleRDDSuite
spark git commit: SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases
Repository: spark Updated Branches: refs/heads/branch-1.2 61c059a4a - 758ebf77d SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases Fix fastBucketFunction for histogram() to handle edge conditions more correctly. Add a test, and fix existing one accordingly Author: Sean Owen so...@cloudera.com Closes #5148 from srowen/SPARK-6480 and squashes the following commits: 974a0a0 [Sean Owen] Additional test of huge ranges, and a few more comments (and comment fixes) 23ec01e [Sean Owen] Fix fastBucketFunction for histogram() to handle edge conditions more correctly. Add a test, and fix existing one accordingly (cherry picked from commit fe15ea976073edd738c006af1eb8d31617a039fc) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/758ebf77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/758ebf77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/758ebf77 Branch: refs/heads/branch-1.2 Commit: 758ebf77d7daded7c5f6f41ee269205bc246d487 Parents: 61c059a Author: Sean Owen so...@cloudera.com Authored: Thu Mar 26 15:00:23 2015 + Committer: Sean Owen so...@cloudera.com Committed: Thu Mar 26 15:00:42 2015 + -- .../apache/spark/rdd/DoubleRDDFunctions.scala | 20 .../org/apache/spark/rdd/DoubleRDDSuite.scala | 24 2 files changed, 29 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/758ebf77/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index e0494ee..e66c06e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -192,25 +192,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } } // Determine the bucket function in constant time. Requires that buckets are evenly spaced -def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = { +def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): Option[Int] = { // If our input is not a number unless the increment is also NaN then we fail fast - if (e.isNaN()) { -return None - } - val bucketNumber = (e - min)/(increment) - // We do this rather than buckets.lengthCompare(bucketNumber) - // because Array[Double] fails to override it (for now). - if (bucketNumber count || bucketNumber 0) { + if (e.isNaN || e min || e max) { None } else { -Some(bucketNumber.toInt.min(count - 1)) +// Compute ratio of e's distance along range to total range first, for better precision +val bucketNumber = (((e - min) / (max - min)) * count).toInt +// should be less than count, but will equal count if e == max, in which case +// it's part of the last end-range-inclusive bucket, so return count-1 +Some(math.min(bucketNumber, count - 1)) } } // Decide which bucket function to pass to histogramPartition. We decide here -// rather than having a general function so that the decission need only be made +// rather than having a general function so that the decision need only be made // once rather than once per shard val bucketFunction = if (evenBuckets) { - fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _ + fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _ } else { basicBucketFunction _ } http://git-wip-us.apache.org/repos/asf/spark/blob/758ebf77/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala index f89bdb6..e29ac0c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -233,6 +233,12 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { assert(histogramBuckets === expectedHistogramBuckets) } + test(WorksWithDoubleValuesAtMinMax) { +val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3)) +assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2) +assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2) + } + test(WorksWithoutBucketsWithMoreRequestedThanElements) { // Verify the basic case of one bucket