spark git commit: [SPARK-9871] [SPARKR] Add expression functions into SparkR which have a variable parameter
Repository: spark Updated Branches: refs/heads/branch-1.5 90245f65c -> 78275c480 [SPARK-9871] [SPARKR] Add expression functions into SparkR which have a variable parameter ### Summary - Add `lit` function - Add `concat`, `greatest`, `least` functions I think we need to improve `collect` function in order to implement `struct` function. Since `collect` doesn't work with arguments which includes a nested `list` variable. It seems that a list against `struct` still has `jobj` classes. So it would be better to solve this problem on another issue. ### JIRA [[SPARK-9871] Add expression functions into SparkR which have a variable parameter - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-9871) Author: Yu ISHIKAWA Closes #8194 from yu-iskw/SPARK-9856. (cherry picked from commit 26e760581fdf7ca913da93fa80e73b7ddabcedf6) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78275c48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78275c48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78275c48 Branch: refs/heads/branch-1.5 Commit: 78275c48035d65359f4749b2da3faa3cc95bd607 Parents: 90245f6 Author: Yu ISHIKAWA Authored: Sun Aug 16 23:33:20 2015 -0700 Committer: Shivaram Venkataraman Committed: Sun Aug 16 23:33:28 2015 -0700 -- R/pkg/NAMESPACE | 4 R/pkg/R/functions.R | 42 +++ R/pkg/R/generics.R | 16 + R/pkg/inst/tests/test_sparkSQL.R | 13 +++ 4 files changed, 75 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/78275c48/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index b2d92bd..fd9dfdf 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -98,6 +98,7 @@ exportMethods("abs", "contains", "cos", "cosh", + "concat", "countDistinct", "desc", "endsWith", @@ -106,10 +107,13 @@ exportMethods("abs", "floor", "getField", "getItem", + "greatest", "hypot", "isNotNull", "isNull", + "lit", "last", + "least", "like", "log", "log10", http://git-wip-us.apache.org/repos/asf/spark/blob/78275c48/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a15d2d5..6eef4d6 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -67,6 +67,14 @@ createFunctions <- function() { createFunctions() +#' @rdname functions +#' @return Creates a Column class of literal value. +setMethod("lit", signature("ANY"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "lit", ifelse(class(x) == "Column", x@jc, x)) +column(jc) + }) + #' Approx Count Distinct #' #' @rdname functions @@ -94,6 +102,40 @@ setMethod("countDistinct", }) #' @rdname functions +#' @return Concatenates multiple input string columns together into a single string column. +setMethod("concat", + signature(x = "Column"), + function(x, ...) { +jcols <- lapply(list(x, ...), function(x) { x@jc }) +jc <- callJStatic("org.apache.spark.sql.functions", "concat", listToSeq(jcols)) +column(jc) + }) + +#' @rdname functions +#' @return Returns the greatest value of the list of column names, skipping null values. +#' This function takes at least 2 parameters. It will return null if all parameters are null. +setMethod("greatest", + signature(x = "Column"), + function(x, ...) { +stopifnot(length(list(...)) > 0) +jcols <- lapply(list(x, ...), function(x) { x@jc }) +jc <- callJStatic("org.apache.spark.sql.functions", "greatest", listToSeq(jcols)) +column(jc) + }) + +#' @rdname functions +#' @return Returns the least value of the list of column names, skipping null values. +#' This function takes at least 2 parameters. It will return null iff all parameters are null. +setMethod("least", + signature(x = "Column"), + function(x, ...) { +stopifnot(length(list(...)) > 0) +jcols <- lapply(list(x, ...), function(x) { x@jc }) +jc <- callJStatic("org.apache.spark.sql.functions", "least", listToSeq(jcols)) +column(jc) + }) + +#' @rdname functions #' @aliases ceil setMethod("ceiling", signature(x = "Column"), http:
spark git commit: [SPARK-9871] [SPARKR] Add expression functions into SparkR which have a variable parameter
Repository: spark Updated Branches: refs/heads/master ae2370e72 -> 26e760581 [SPARK-9871] [SPARKR] Add expression functions into SparkR which have a variable parameter ### Summary - Add `lit` function - Add `concat`, `greatest`, `least` functions I think we need to improve `collect` function in order to implement `struct` function. Since `collect` doesn't work with arguments which includes a nested `list` variable. It seems that a list against `struct` still has `jobj` classes. So it would be better to solve this problem on another issue. ### JIRA [[SPARK-9871] Add expression functions into SparkR which have a variable parameter - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-9871) Author: Yu ISHIKAWA Closes #8194 from yu-iskw/SPARK-9856. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26e76058 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26e76058 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26e76058 Branch: refs/heads/master Commit: 26e760581fdf7ca913da93fa80e73b7ddabcedf6 Parents: ae2370e Author: Yu ISHIKAWA Authored: Sun Aug 16 23:33:20 2015 -0700 Committer: Shivaram Venkataraman Committed: Sun Aug 16 23:33:20 2015 -0700 -- R/pkg/NAMESPACE | 4 R/pkg/R/functions.R | 42 +++ R/pkg/R/generics.R | 16 + R/pkg/inst/tests/test_sparkSQL.R | 13 +++ 4 files changed, 75 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26e76058/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index b2d92bd..fd9dfdf 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -98,6 +98,7 @@ exportMethods("abs", "contains", "cos", "cosh", + "concat", "countDistinct", "desc", "endsWith", @@ -106,10 +107,13 @@ exportMethods("abs", "floor", "getField", "getItem", + "greatest", "hypot", "isNotNull", "isNull", + "lit", "last", + "least", "like", "log", "log10", http://git-wip-us.apache.org/repos/asf/spark/blob/26e76058/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a15d2d5..6eef4d6 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -67,6 +67,14 @@ createFunctions <- function() { createFunctions() +#' @rdname functions +#' @return Creates a Column class of literal value. +setMethod("lit", signature("ANY"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "lit", ifelse(class(x) == "Column", x@jc, x)) +column(jc) + }) + #' Approx Count Distinct #' #' @rdname functions @@ -94,6 +102,40 @@ setMethod("countDistinct", }) #' @rdname functions +#' @return Concatenates multiple input string columns together into a single string column. +setMethod("concat", + signature(x = "Column"), + function(x, ...) { +jcols <- lapply(list(x, ...), function(x) { x@jc }) +jc <- callJStatic("org.apache.spark.sql.functions", "concat", listToSeq(jcols)) +column(jc) + }) + +#' @rdname functions +#' @return Returns the greatest value of the list of column names, skipping null values. +#' This function takes at least 2 parameters. It will return null if all parameters are null. +setMethod("greatest", + signature(x = "Column"), + function(x, ...) { +stopifnot(length(list(...)) > 0) +jcols <- lapply(list(x, ...), function(x) { x@jc }) +jc <- callJStatic("org.apache.spark.sql.functions", "greatest", listToSeq(jcols)) +column(jc) + }) + +#' @rdname functions +#' @return Returns the least value of the list of column names, skipping null values. +#' This function takes at least 2 parameters. It will return null iff all parameters are null. +setMethod("least", + signature(x = "Column"), + function(x, ...) { +stopifnot(length(list(...)) > 0) +jcols <- lapply(list(x, ...), function(x) { x@jc }) +jc <- callJStatic("org.apache.spark.sql.functions", "least", listToSeq(jcols)) +column(jc) + }) + +#' @rdname functions #' @aliases ceil setMethod("ceiling", signature(x = "Column"), http://git-wip-us.apache.org/repos/asf/spark/blob/26e76058/R/pkg/R/generics.R ---
spark git commit: [SPARK-10005] [SQL] Fixes schema merging for nested structs
Repository: spark Updated Branches: refs/heads/branch-1.5 e2c6ef810 -> 90245f65c [SPARK-10005] [SQL] Fixes schema merging for nested structs In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled. For example, the schema of a Parquet file to be read can be: ``` message individual { required group f1 { optional binary f11 (utf8); } } ``` while the global schema is: ``` message global { required group f1 { optional binary f11 (utf8); optional int32 f12; } } ``` This PR fixes this issue by padding missing fields when creating actual converters. Author: Cheng Lian Closes #8228 from liancheng/spark-10005/nested-schema-merging. (cherry picked from commit ae2370e72f93db8a28b262e8252c55fe1fc9873c) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90245f65 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90245f65 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90245f65 Branch: refs/heads/branch-1.5 Commit: 90245f65c94a40d3210207abaf6f136f5ce2861f Parents: e2c6ef8 Author: Cheng Lian Authored: Sun Aug 16 10:17:58 2015 -0700 Committer: Yin Huai Committed: Sun Aug 16 10:18:08 2015 -0700 -- .../parquet/CatalystReadSupport.scala | 19 -- .../parquet/CatalystRowConverter.scala | 70 ++-- .../parquet/CatalystSchemaConverter.scala | 15 + .../datasources/parquet/ParquetQuerySuite.scala | 30 - 4 files changed, 112 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 4049795..a4679bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { + // Called after `init()` when initializing Parquet record reader. override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], @@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // available if the target file is written by Spark SQL. .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) }.map(StructType.fromString).getOrElse { -logDebug("Catalyst schema not available, falling back to Parquet schema") +logInfo("Catalyst schema not available, falling back to Parquet schema") toCatalyst.convert(parquetRequestedSchema) } -logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema") +logInfo { + s"""Going to read the following fields from the Parquet file: + | + |Parquet form: + |$parquetRequestedSchema + | + |Catalyst form: + |$catalystRequestedSchema + """.stripMargin +} + new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } + // Called before `prepareForRead()` when initializing Parquet record reader. override def init(context: InitContext): ReadContext = { val conf = context.getConfiguration // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst -// schema of this file from its the metadata. +// schema of this file from its metadata. val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) // Optional schema of requested columns, in the form of a string serialized from a Catalyst @@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) -logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema") new ReadContext(parquetRequestedSchema, metadata) } } http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -
spark git commit: [SPARK-10005] [SQL] Fixes schema merging for nested structs
Repository: spark Updated Branches: refs/heads/master cf016075a -> ae2370e72 [SPARK-10005] [SQL] Fixes schema merging for nested structs In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled. For example, the schema of a Parquet file to be read can be: ``` message individual { required group f1 { optional binary f11 (utf8); } } ``` while the global schema is: ``` message global { required group f1 { optional binary f11 (utf8); optional int32 f12; } } ``` This PR fixes this issue by padding missing fields when creating actual converters. Author: Cheng Lian Closes #8228 from liancheng/spark-10005/nested-schema-merging. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae2370e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae2370e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae2370e7 Branch: refs/heads/master Commit: ae2370e72f93db8a28b262e8252c55fe1fc9873c Parents: cf01607 Author: Cheng Lian Authored: Sun Aug 16 10:17:58 2015 -0700 Committer: Yin Huai Committed: Sun Aug 16 10:17:58 2015 -0700 -- .../parquet/CatalystReadSupport.scala | 19 -- .../parquet/CatalystRowConverter.scala | 70 ++-- .../parquet/CatalystSchemaConverter.scala | 15 + .../datasources/parquet/ParquetQuerySuite.scala | 30 - 4 files changed, 112 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae2370e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 4049795..a4679bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { + // Called after `init()` when initializing Parquet record reader. override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], @@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // available if the target file is written by Spark SQL. .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) }.map(StructType.fromString).getOrElse { -logDebug("Catalyst schema not available, falling back to Parquet schema") +logInfo("Catalyst schema not available, falling back to Parquet schema") toCatalyst.convert(parquetRequestedSchema) } -logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema") +logInfo { + s"""Going to read the following fields from the Parquet file: + | + |Parquet form: + |$parquetRequestedSchema + | + |Catalyst form: + |$catalystRequestedSchema + """.stripMargin +} + new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } + // Called before `prepareForRead()` when initializing Parquet record reader. override def init(context: InitContext): ReadContext = { val conf = context.getConfiguration // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst -// schema of this file from its the metadata. +// schema of this file from its metadata. val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) // Optional schema of requested columns, in the form of a string serialized from a Catalyst @@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) -logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema") new ReadContext(parquetRequestedSchema, metadata) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ae2370e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -- diff --git a/sql/core/src/main/scala
spark git commit: [SPARK-9973] [SQL] Correct in-memory columnar buffer size
Repository: spark Updated Branches: refs/heads/branch-1.5 fa55c2742 -> e2c6ef810 [SPARK-9973] [SQL] Correct in-memory columnar buffer size The `initialSize` argument of `ColumnBuilder.initialize()` should be the number of rows rather than bytes. However `InMemoryColumnarTableScan` passes in a byte size, which makes Spark SQL allocate more memory than necessary when building in-memory columnar buffers. Author: Kun Xu Closes #8189 from viper-kun/errorSize. (cherry picked from commit 182f9b7a6d3a3ee7ec7de6abc24e296aa794e4e8) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2c6ef81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2c6ef81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2c6ef81 Branch: refs/heads/branch-1.5 Commit: e2c6ef81030aaf472771d98ec86d1c17119f2c4e Parents: fa55c27 Author: Kun Xu Authored: Sun Aug 16 14:44:23 2015 +0800 Committer: Cheng Lian Committed: Sun Aug 16 19:35:04 2015 +0800 -- .../org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2c6ef81/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index d553bb61..45f15fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -121,8 +121,7 @@ private[sql] case class InMemoryRelation( def next(): CachedBatch = { val columnBuilders = output.map { attribute => val columnType = ColumnType(attribute.dataType) -val initialBufferSize = columnType.defaultSize * batchSize -ColumnBuilder(attribute.dataType, initialBufferSize, attribute.name, useCompression) +ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray var rowCount = 0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps
Repository: spark Updated Branches: refs/heads/branch-1.5 4f75ce2e1 -> fa55c2742 [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps The shuffle locality patch made the DAGScheduler aware of shuffle data, but for RDDs that have both narrow and shuffle dependencies, it can cause them to place tasks based on the shuffle dependency instead of the narrow one. This case is common in iterative join-based algorithms like PageRank and ALS, where one RDD is hash-partitioned and one isn't. Author: Matei Zaharia Closes #8220 from mateiz/shuffle-loc-fix. (cherry picked from commit cf016075a006034c24c5b758edb279f3e151d25d) Signed-off-by: Matei Zaharia Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa55c274 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa55c274 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa55c274 Branch: refs/heads/branch-1.5 Commit: fa55c27427bec0291847d254f4424b754dd211c9 Parents: 4f75ce2 Author: Matei Zaharia Authored: Sun Aug 16 00:34:58 2015 -0700 Committer: Matei Zaharia Committed: Sun Aug 16 00:35:09 2015 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 37 +++- .../spark/scheduler/DAGSchedulerSuite.scala | 26 -- 2 files changed, 44 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa55c274/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f1c63d0..dadf83a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1383,33 +1383,36 @@ class DAGScheduler( return rddPrefs.map(TaskLocation(_)) } +// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency +// that has any placement preferences. Ideally we would choose based on transfer sizes, +// but this will do for now. rdd.dependencies.foreach { case n: NarrowDependency[_] => -// If the RDD has narrow dependencies, pick the first partition of the first narrow dep -// that has any placement preferences. Ideally we would choose based on transfer sizes, -// but this will do for now. for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } - case s: ShuffleDependency[_, _, _] => -// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION -// of data as preferred locations -if (shuffleLocalityEnabled && -rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD && -s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) { - // Get the preferred map output locations for this reducer - val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId, -partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION) - if (topLocsForReducer.nonEmpty) { -return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId)) - } -} - case _ => } + +// If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that +// have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations +if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) { + rdd.dependencies.foreach { +case s: ShuffleDependency[_, _, _] => + if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) { +// Get the preferred map output locations for this reducer +val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId, + partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION) +if (topLocsForReducer.nonEmpty) { + return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId)) +} + } +case _ => + } +} Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/fa55c274/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b0ca49c..a063596 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSc
spark git commit: [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps
Repository: spark Updated Branches: refs/heads/master 5f9ce738f -> cf016075a [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps The shuffle locality patch made the DAGScheduler aware of shuffle data, but for RDDs that have both narrow and shuffle dependencies, it can cause them to place tasks based on the shuffle dependency instead of the narrow one. This case is common in iterative join-based algorithms like PageRank and ALS, where one RDD is hash-partitioned and one isn't. Author: Matei Zaharia Closes #8220 from mateiz/shuffle-loc-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf016075 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf016075 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf016075 Branch: refs/heads/master Commit: cf016075a006034c24c5b758edb279f3e151d25d Parents: 5f9ce73 Author: Matei Zaharia Authored: Sun Aug 16 00:34:58 2015 -0700 Committer: Matei Zaharia Committed: Sun Aug 16 00:34:58 2015 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 37 +++- .../spark/scheduler/DAGSchedulerSuite.scala | 26 -- 2 files changed, 44 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cf016075/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f1c63d0..dadf83a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1383,33 +1383,36 @@ class DAGScheduler( return rddPrefs.map(TaskLocation(_)) } +// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency +// that has any placement preferences. Ideally we would choose based on transfer sizes, +// but this will do for now. rdd.dependencies.foreach { case n: NarrowDependency[_] => -// If the RDD has narrow dependencies, pick the first partition of the first narrow dep -// that has any placement preferences. Ideally we would choose based on transfer sizes, -// but this will do for now. for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } - case s: ShuffleDependency[_, _, _] => -// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION -// of data as preferred locations -if (shuffleLocalityEnabled && -rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD && -s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) { - // Get the preferred map output locations for this reducer - val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId, -partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION) - if (topLocsForReducer.nonEmpty) { -return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId)) - } -} - case _ => } + +// If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that +// have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations +if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) { + rdd.dependencies.foreach { +case s: ShuffleDependency[_, _, _] => + if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) { +// Get the preferred map output locations for this reducer +val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId, + partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION) +if (topLocsForReducer.nonEmpty) { + return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId)) +} + } +case _ => + } +} Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/cf016075/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b0ca49c..a063596 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -926,7 +9
spark git commit: [SPARK-8844] [SPARKR] head/collect is broken in SparkR.
Repository: spark Updated Branches: refs/heads/master 182f9b7a6 -> 5f9ce738f [SPARK-8844] [SPARKR] head/collect is broken in SparkR. This is a WIP patch for SPARK-8844 for collecting reviews. This bug is about reading an empty DataFrame. in readCol(), lapply(1:numRows, function(x) { does not take into consideration the case where numRows = 0. Will add unit test case. Author: Sun Rui Closes #7419 from sun-rui/SPARK-8844. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f9ce738 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f9ce738 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f9ce738 Branch: refs/heads/master Commit: 5f9ce738fe6bab3f0caffad0df1d3876178cf469 Parents: 182f9b7 Author: Sun Rui Authored: Sun Aug 16 00:30:02 2015 -0700 Committer: Shivaram Venkataraman Committed: Sun Aug 16 00:30:02 2015 -0700 -- R/pkg/R/deserialize.R| 16 ++-- R/pkg/inst/tests/test_sparkSQL.R | 20 2 files changed, 30 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f9ce738/R/pkg/R/deserialize.R -- diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 6d364f7..33bf13e 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -176,10 +176,14 @@ readRow <- function(inputCon) { # Take a single column as Array[Byte] and deserialize it into an atomic vector readCol <- function(inputCon, numRows) { - # sapply can not work with POSIXlt - do.call(c, lapply(1:numRows, function(x) { -value <- readObject(inputCon) -# Replace NULL with NA so we can coerce to vectors -if (is.null(value)) NA else value - })) + if (numRows > 0) { +# sapply can not work with POSIXlt +do.call(c, lapply(1:numRows, function(x) { + value <- readObject(inputCon) + # Replace NULL with NA so we can coerce to vectors + if (is.null(value)) NA else value +})) + } else { +vector() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/5f9ce738/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index e6d3b21..c77f633 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -408,6 +408,14 @@ test_that("collect() returns a data.frame", { expect_equal(names(rdf)[1], "age") expect_equal(nrow(rdf), 3) expect_equal(ncol(rdf), 2) + + # collect() returns data correctly from a DataFrame with 0 row + df0 <- limit(df, 0) + rdf <- collect(df0) + expect_true(is.data.frame(rdf)) + expect_equal(names(rdf)[1], "age") + expect_equal(nrow(rdf), 0) + expect_equal(ncol(rdf), 2) }) test_that("limit() returns DataFrame with the correct number of rows", { @@ -492,6 +500,18 @@ test_that("head() and first() return the correct data", { testFirst <- first(df) expect_equal(nrow(testFirst), 1) + + # head() and first() return the correct data on + # a DataFrame with 0 row + df0 <- limit(df, 0) + + testHead <- head(df0) + expect_equal(nrow(testHead), 0) + expect_equal(ncol(testHead), 2) + + testFirst <- first(df0) + expect_equal(nrow(testFirst), 0) + expect_equal(ncol(testFirst), 2) }) test_that("distinct() and unique on DataFrames", { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8844] [SPARKR] head/collect is broken in SparkR.
Repository: spark Updated Branches: refs/heads/branch-1.5 881baf100 -> 4f75ce2e1 [SPARK-8844] [SPARKR] head/collect is broken in SparkR. This is a WIP patch for SPARK-8844 for collecting reviews. This bug is about reading an empty DataFrame. in readCol(), lapply(1:numRows, function(x) { does not take into consideration the case where numRows = 0. Will add unit test case. Author: Sun Rui Closes #7419 from sun-rui/SPARK-8844. (cherry picked from commit 5f9ce738fe6bab3f0caffad0df1d3876178cf469) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f75ce2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f75ce2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f75ce2e Branch: refs/heads/branch-1.5 Commit: 4f75ce2e193c813f4e3ad067749b6e7b4f0ee135 Parents: 881baf1 Author: Sun Rui Authored: Sun Aug 16 00:30:02 2015 -0700 Committer: Shivaram Venkataraman Committed: Sun Aug 16 00:30:10 2015 -0700 -- R/pkg/R/deserialize.R| 16 ++-- R/pkg/inst/tests/test_sparkSQL.R | 20 2 files changed, 30 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f75ce2e/R/pkg/R/deserialize.R -- diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 6d364f7..33bf13e 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -176,10 +176,14 @@ readRow <- function(inputCon) { # Take a single column as Array[Byte] and deserialize it into an atomic vector readCol <- function(inputCon, numRows) { - # sapply can not work with POSIXlt - do.call(c, lapply(1:numRows, function(x) { -value <- readObject(inputCon) -# Replace NULL with NA so we can coerce to vectors -if (is.null(value)) NA else value - })) + if (numRows > 0) { +# sapply can not work with POSIXlt +do.call(c, lapply(1:numRows, function(x) { + value <- readObject(inputCon) + # Replace NULL with NA so we can coerce to vectors + if (is.null(value)) NA else value +})) + } else { +vector() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/4f75ce2e/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index e6d3b21..c77f633 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -408,6 +408,14 @@ test_that("collect() returns a data.frame", { expect_equal(names(rdf)[1], "age") expect_equal(nrow(rdf), 3) expect_equal(ncol(rdf), 2) + + # collect() returns data correctly from a DataFrame with 0 row + df0 <- limit(df, 0) + rdf <- collect(df0) + expect_true(is.data.frame(rdf)) + expect_equal(names(rdf)[1], "age") + expect_equal(nrow(rdf), 0) + expect_equal(ncol(rdf), 2) }) test_that("limit() returns DataFrame with the correct number of rows", { @@ -492,6 +500,18 @@ test_that("head() and first() return the correct data", { testFirst <- first(df) expect_equal(nrow(testFirst), 1) + + # head() and first() return the correct data on + # a DataFrame with 0 row + df0 <- limit(df, 0) + + testHead <- head(df0) + expect_equal(nrow(testHead), 0) + expect_equal(ncol(testHead), 2) + + testFirst <- first(df0) + expect_equal(nrow(testFirst), 0) + expect_equal(ncol(testFirst), 2) }) test_that("distinct() and unique on DataFrames", { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org