spark git commit: [SPARK-9871] [SPARKR] Add expression functions into SparkR which have a variable parameter

2015-08-16 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 90245f65c -> 78275c480


[SPARK-9871] [SPARKR] Add expression functions into SparkR which have a 
variable parameter

### Summary

- Add `lit` function
- Add `concat`, `greatest`, `least` functions

I think we need to improve `collect` function in order to implement `struct` 
function. Since `collect` doesn't work with arguments which includes a nested 
`list` variable. It seems that a list against `struct` still has `jobj` 
classes. So it would be better to solve this problem on another issue.

### JIRA
[[SPARK-9871] Add expression functions into SparkR which have a variable 
parameter - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-9871)

Author: Yu ISHIKAWA 

Closes #8194 from yu-iskw/SPARK-9856.

(cherry picked from commit 26e760581fdf7ca913da93fa80e73b7ddabcedf6)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78275c48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78275c48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78275c48

Branch: refs/heads/branch-1.5
Commit: 78275c48035d65359f4749b2da3faa3cc95bd607
Parents: 90245f6
Author: Yu ISHIKAWA 
Authored: Sun Aug 16 23:33:20 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Sun Aug 16 23:33:28 2015 -0700

--
 R/pkg/NAMESPACE  |  4 
 R/pkg/R/functions.R  | 42 +++
 R/pkg/R/generics.R   | 16 +
 R/pkg/inst/tests/test_sparkSQL.R | 13 +++
 4 files changed, 75 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/78275c48/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index b2d92bd..fd9dfdf 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -98,6 +98,7 @@ exportMethods("abs",
   "contains",
   "cos",
   "cosh",
+  "concat",
   "countDistinct",
   "desc",
   "endsWith",
@@ -106,10 +107,13 @@ exportMethods("abs",
   "floor",
   "getField",
   "getItem",
+  "greatest",
   "hypot",
   "isNotNull",
   "isNull",
+  "lit",
   "last",
+  "least",
   "like",
   "log",
   "log10",

http://git-wip-us.apache.org/repos/asf/spark/blob/78275c48/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index a15d2d5..6eef4d6 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -67,6 +67,14 @@ createFunctions <- function() {
 
 createFunctions()
 
+#' @rdname functions
+#' @return Creates a Column class of literal value.
+setMethod("lit", signature("ANY"),
+  function(x) {
+jc <- callJStatic("org.apache.spark.sql.functions", "lit", 
ifelse(class(x) == "Column", x@jc, x))
+column(jc)
+  })
+
 #' Approx Count Distinct
 #'
 #' @rdname functions
@@ -94,6 +102,40 @@ setMethod("countDistinct",
   })
 
 #' @rdname functions
+#' @return Concatenates multiple input string columns together into a single 
string column.
+setMethod("concat",
+  signature(x = "Column"),
+  function(x, ...) {
+jcols <- lapply(list(x, ...), function(x) { x@jc })
+jc <- callJStatic("org.apache.spark.sql.functions", "concat", 
listToSeq(jcols))
+column(jc)
+  })
+
+#' @rdname functions
+#' @return Returns the greatest value of the list of column names, skipping 
null values.
+#' This function takes at least 2 parameters. It will return null if 
all parameters are null.
+setMethod("greatest",
+  signature(x = "Column"),
+  function(x, ...) {
+stopifnot(length(list(...)) > 0)
+jcols <- lapply(list(x, ...), function(x) { x@jc })
+jc <- callJStatic("org.apache.spark.sql.functions", "greatest", 
listToSeq(jcols))
+column(jc)
+  })
+
+#' @rdname functions
+#' @return Returns the least value of the list of column names, skipping null 
values.
+#' This function takes at least 2 parameters. It will return null iff 
all parameters are null.
+setMethod("least",
+  signature(x = "Column"),
+  function(x, ...) {
+stopifnot(length(list(...)) > 0)
+jcols <- lapply(list(x, ...), function(x) { x@jc })
+jc <- callJStatic("org.apache.spark.sql.functions", "least", 
listToSeq(jcols))
+column(jc)
+  })
+
+#' @rdname functions
 #' @aliases ceil
 setMethod("ceiling",
   signature(x = "Column"),

http:

spark git commit: [SPARK-9871] [SPARKR] Add expression functions into SparkR which have a variable parameter

2015-08-16 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master ae2370e72 -> 26e760581


[SPARK-9871] [SPARKR] Add expression functions into SparkR which have a 
variable parameter

### Summary

- Add `lit` function
- Add `concat`, `greatest`, `least` functions

I think we need to improve `collect` function in order to implement `struct` 
function. Since `collect` doesn't work with arguments which includes a nested 
`list` variable. It seems that a list against `struct` still has `jobj` 
classes. So it would be better to solve this problem on another issue.

### JIRA
[[SPARK-9871] Add expression functions into SparkR which have a variable 
parameter - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-9871)

Author: Yu ISHIKAWA 

Closes #8194 from yu-iskw/SPARK-9856.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26e76058
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26e76058
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26e76058

Branch: refs/heads/master
Commit: 26e760581fdf7ca913da93fa80e73b7ddabcedf6
Parents: ae2370e
Author: Yu ISHIKAWA 
Authored: Sun Aug 16 23:33:20 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Sun Aug 16 23:33:20 2015 -0700

--
 R/pkg/NAMESPACE  |  4 
 R/pkg/R/functions.R  | 42 +++
 R/pkg/R/generics.R   | 16 +
 R/pkg/inst/tests/test_sparkSQL.R | 13 +++
 4 files changed, 75 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26e76058/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index b2d92bd..fd9dfdf 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -98,6 +98,7 @@ exportMethods("abs",
   "contains",
   "cos",
   "cosh",
+  "concat",
   "countDistinct",
   "desc",
   "endsWith",
@@ -106,10 +107,13 @@ exportMethods("abs",
   "floor",
   "getField",
   "getItem",
+  "greatest",
   "hypot",
   "isNotNull",
   "isNull",
+  "lit",
   "last",
+  "least",
   "like",
   "log",
   "log10",

http://git-wip-us.apache.org/repos/asf/spark/blob/26e76058/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index a15d2d5..6eef4d6 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -67,6 +67,14 @@ createFunctions <- function() {
 
 createFunctions()
 
+#' @rdname functions
+#' @return Creates a Column class of literal value.
+setMethod("lit", signature("ANY"),
+  function(x) {
+jc <- callJStatic("org.apache.spark.sql.functions", "lit", 
ifelse(class(x) == "Column", x@jc, x))
+column(jc)
+  })
+
 #' Approx Count Distinct
 #'
 #' @rdname functions
@@ -94,6 +102,40 @@ setMethod("countDistinct",
   })
 
 #' @rdname functions
+#' @return Concatenates multiple input string columns together into a single 
string column.
+setMethod("concat",
+  signature(x = "Column"),
+  function(x, ...) {
+jcols <- lapply(list(x, ...), function(x) { x@jc })
+jc <- callJStatic("org.apache.spark.sql.functions", "concat", 
listToSeq(jcols))
+column(jc)
+  })
+
+#' @rdname functions
+#' @return Returns the greatest value of the list of column names, skipping 
null values.
+#' This function takes at least 2 parameters. It will return null if 
all parameters are null.
+setMethod("greatest",
+  signature(x = "Column"),
+  function(x, ...) {
+stopifnot(length(list(...)) > 0)
+jcols <- lapply(list(x, ...), function(x) { x@jc })
+jc <- callJStatic("org.apache.spark.sql.functions", "greatest", 
listToSeq(jcols))
+column(jc)
+  })
+
+#' @rdname functions
+#' @return Returns the least value of the list of column names, skipping null 
values.
+#' This function takes at least 2 parameters. It will return null iff 
all parameters are null.
+setMethod("least",
+  signature(x = "Column"),
+  function(x, ...) {
+stopifnot(length(list(...)) > 0)
+jcols <- lapply(list(x, ...), function(x) { x@jc })
+jc <- callJStatic("org.apache.spark.sql.functions", "least", 
listToSeq(jcols))
+column(jc)
+  })
+
+#' @rdname functions
 #' @aliases ceil
 setMethod("ceiling",
   signature(x = "Column"),

http://git-wip-us.apache.org/repos/asf/spark/blob/26e76058/R/pkg/R/generics.R
---

spark git commit: [SPARK-10005] [SQL] Fixes schema merging for nested structs

2015-08-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e2c6ef810 -> 90245f65c


[SPARK-10005] [SQL] Fixes schema merging for nested structs

In case of schema merging, we only handled first level fields when converting 
Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.

For example, the schema of a Parquet file to be read can be:

```
message individual {
  required group f1 {
optional binary f11 (utf8);
  }
}
```

while the global schema is:

```
message global {
  required group f1 {
optional binary f11 (utf8);
optional int32 f12;
  }
}
```

This PR fixes this issue by padding missing fields when creating actual 
converters.

Author: Cheng Lian 

Closes #8228 from liancheng/spark-10005/nested-schema-merging.

(cherry picked from commit ae2370e72f93db8a28b262e8252c55fe1fc9873c)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90245f65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90245f65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90245f65

Branch: refs/heads/branch-1.5
Commit: 90245f65c94a40d3210207abaf6f136f5ce2861f
Parents: e2c6ef8
Author: Cheng Lian 
Authored: Sun Aug 16 10:17:58 2015 -0700
Committer: Yin Huai 
Committed: Sun Aug 16 10:18:08 2015 -0700

--
 .../parquet/CatalystReadSupport.scala   | 19 --
 .../parquet/CatalystRowConverter.scala  | 70 ++--
 .../parquet/CatalystSchemaConverter.scala   | 15 +
 .../datasources/parquet/ParquetQuerySuite.scala | 30 -
 4 files changed, 112 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 4049795..a4679bb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
 
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
+  // Called after `init()` when initializing Parquet record reader.
   override def prepareForRead(
   conf: Configuration,
   keyValueMetaData: JMap[String, String],
@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
   // available if the target file is written by Spark SQL.
   .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
   }.map(StructType.fromString).getOrElse {
-logDebug("Catalyst schema not available, falling back to Parquet 
schema")
+logInfo("Catalyst schema not available, falling back to Parquet 
schema")
 toCatalyst.convert(parquetRequestedSchema)
   }
 
-logDebug(s"Catalyst schema used to read Parquet files: 
$catalystRequestedSchema")
+logInfo {
+  s"""Going to read the following fields from the Parquet file:
+ |
+ |Parquet form:
+ |$parquetRequestedSchema
+ |
+ |Catalyst form:
+ |$catalystRequestedSchema
+   """.stripMargin
+}
+
 new CatalystRecordMaterializer(parquetRequestedSchema, 
catalystRequestedSchema)
   }
 
+  // Called before `prepareForRead()` when initializing Parquet record reader.
   override def init(context: InitContext): ReadContext = {
 val conf = context.getConfiguration
 
 // If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
-// schema of this file from its the metadata.
+// schema of this file from its metadata.
 val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
 
 // Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
 
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
 maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
 
-logInfo(s"Going to read Parquet file with these requested columns: 
$parquetRequestedSchema")
 new ReadContext(parquetRequestedSchema, metadata)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
-

spark git commit: [SPARK-10005] [SQL] Fixes schema merging for nested structs

2015-08-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master cf016075a -> ae2370e72


[SPARK-10005] [SQL] Fixes schema merging for nested structs

In case of schema merging, we only handled first level fields when converting 
Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.

For example, the schema of a Parquet file to be read can be:

```
message individual {
  required group f1 {
optional binary f11 (utf8);
  }
}
```

while the global schema is:

```
message global {
  required group f1 {
optional binary f11 (utf8);
optional int32 f12;
  }
}
```

This PR fixes this issue by padding missing fields when creating actual 
converters.

Author: Cheng Lian 

Closes #8228 from liancheng/spark-10005/nested-schema-merging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae2370e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae2370e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae2370e7

Branch: refs/heads/master
Commit: ae2370e72f93db8a28b262e8252c55fe1fc9873c
Parents: cf01607
Author: Cheng Lian 
Authored: Sun Aug 16 10:17:58 2015 -0700
Committer: Yin Huai 
Committed: Sun Aug 16 10:17:58 2015 -0700

--
 .../parquet/CatalystReadSupport.scala   | 19 --
 .../parquet/CatalystRowConverter.scala  | 70 ++--
 .../parquet/CatalystSchemaConverter.scala   | 15 +
 .../datasources/parquet/ParquetQuerySuite.scala | 30 -
 4 files changed, 112 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ae2370e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 4049795..a4679bb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
 
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
+  // Called after `init()` when initializing Parquet record reader.
   override def prepareForRead(
   conf: Configuration,
   keyValueMetaData: JMap[String, String],
@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
   // available if the target file is written by Spark SQL.
   .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
   }.map(StructType.fromString).getOrElse {
-logDebug("Catalyst schema not available, falling back to Parquet 
schema")
+logInfo("Catalyst schema not available, falling back to Parquet 
schema")
 toCatalyst.convert(parquetRequestedSchema)
   }
 
-logDebug(s"Catalyst schema used to read Parquet files: 
$catalystRequestedSchema")
+logInfo {
+  s"""Going to read the following fields from the Parquet file:
+ |
+ |Parquet form:
+ |$parquetRequestedSchema
+ |
+ |Catalyst form:
+ |$catalystRequestedSchema
+   """.stripMargin
+}
+
 new CatalystRecordMaterializer(parquetRequestedSchema, 
catalystRequestedSchema)
   }
 
+  // Called before `prepareForRead()` when initializing Parquet record reader.
   override def init(context: InitContext): ReadContext = {
 val conf = context.getConfiguration
 
 // If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
-// schema of this file from its the metadata.
+// schema of this file from its metadata.
 val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
 
 // Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
 
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
 maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
 
-logInfo(s"Going to read Parquet file with these requested columns: 
$parquetRequestedSchema")
 new ReadContext(parquetRequestedSchema, metadata)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae2370e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
--
diff --git 
a/sql/core/src/main/scala

spark git commit: [SPARK-9973] [SQL] Correct in-memory columnar buffer size

2015-08-16 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 fa55c2742 -> e2c6ef810


[SPARK-9973] [SQL] Correct in-memory columnar buffer size

The `initialSize` argument of `ColumnBuilder.initialize()` should be the
number of rows rather than bytes.  However `InMemoryColumnarTableScan`
passes in a byte size, which makes Spark SQL allocate more memory than
necessary when building in-memory columnar buffers.

Author: Kun Xu 

Closes #8189 from viper-kun/errorSize.

(cherry picked from commit 182f9b7a6d3a3ee7ec7de6abc24e296aa794e4e8)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2c6ef81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2c6ef81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2c6ef81

Branch: refs/heads/branch-1.5
Commit: e2c6ef81030aaf472771d98ec86d1c17119f2c4e
Parents: fa55c27
Author: Kun Xu 
Authored: Sun Aug 16 14:44:23 2015 +0800
Committer: Cheng Lian 
Committed: Sun Aug 16 19:35:04 2015 +0800

--
 .../org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e2c6ef81/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index d553bb61..45f15fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -121,8 +121,7 @@ private[sql] case class InMemoryRelation(
 def next(): CachedBatch = {
   val columnBuilders = output.map { attribute =>
 val columnType = ColumnType(attribute.dataType)
-val initialBufferSize = columnType.defaultSize * batchSize
-ColumnBuilder(attribute.dataType, initialBufferSize, 
attribute.name, useCompression)
+ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
   }.toArray
 
   var rowCount = 0


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps

2015-08-16 Thread matei
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4f75ce2e1 -> fa55c2742


[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps

The shuffle locality patch made the DAGScheduler aware of shuffle data,
but for RDDs that have both narrow and shuffle dependencies, it can
cause them to place tasks based on the shuffle dependency instead of the
narrow one. This case is common in iterative join-based algorithms like
PageRank and ALS, where one RDD is hash-partitioned and one isn't.

Author: Matei Zaharia 

Closes #8220 from mateiz/shuffle-loc-fix.

(cherry picked from commit cf016075a006034c24c5b758edb279f3e151d25d)
Signed-off-by: Matei Zaharia 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa55c274
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa55c274
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa55c274

Branch: refs/heads/branch-1.5
Commit: fa55c27427bec0291847d254f4424b754dd211c9
Parents: 4f75ce2
Author: Matei Zaharia 
Authored: Sun Aug 16 00:34:58 2015 -0700
Committer: Matei Zaharia 
Committed: Sun Aug 16 00:35:09 2015 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 37 +++-
 .../spark/scheduler/DAGSchedulerSuite.scala | 26 --
 2 files changed, 44 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa55c274/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1c63d0..dadf83a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1383,33 +1383,36 @@ class DAGScheduler(
   return rddPrefs.map(TaskLocation(_))
 }
 
+// If the RDD has narrow dependencies, pick the first partition of the 
first narrow dependency
+// that has any placement preferences. Ideally we would choose based on 
transfer sizes,
+// but this will do for now.
 rdd.dependencies.foreach {
   case n: NarrowDependency[_] =>
-// If the RDD has narrow dependencies, pick the first partition of the 
first narrow dep
-// that has any placement preferences. Ideally we would choose based 
on transfer sizes,
-// but this will do for now.
 for (inPart <- n.getParents(partition)) {
   val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
   if (locs != Nil) {
 return locs
   }
 }
-  case s: ShuffleDependency[_, _, _] =>
-// For shuffle dependencies, pick locations which have at least 
REDUCER_PREF_LOCS_FRACTION
-// of data as preferred locations
-if (shuffleLocalityEnabled &&
-rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
-s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
-  // Get the preferred map output locations for this reducer
-  val topLocsForReducer = 
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
-partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
-  if (topLocsForReducer.nonEmpty) {
-return topLocsForReducer.get.map(loc => TaskLocation(loc.host, 
loc.executorId))
-  }
-}
-
   case _ =>
 }
+
+// If the RDD has shuffle dependencies and shuffle locality is enabled, 
pick locations that
+// have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
+if (shuffleLocalityEnabled && rdd.partitions.length < 
SHUFFLE_PREF_REDUCE_THRESHOLD) {
+  rdd.dependencies.foreach {
+case s: ShuffleDependency[_, _, _] =>
+  if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
+// Get the preferred map output locations for this reducer
+val topLocsForReducer = 
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+  partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
+if (topLocsForReducer.nonEmpty) {
+  return topLocsForReducer.get.map(loc => TaskLocation(loc.host, 
loc.executorId))
+}
+  }
+case _ =>
+  }
+}
 Nil
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fa55c274/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b0ca49c..a063596 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSc

spark git commit: [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps

2015-08-16 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master 5f9ce738f -> cf016075a


[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps

The shuffle locality patch made the DAGScheduler aware of shuffle data,
but for RDDs that have both narrow and shuffle dependencies, it can
cause them to place tasks based on the shuffle dependency instead of the
narrow one. This case is common in iterative join-based algorithms like
PageRank and ALS, where one RDD is hash-partitioned and one isn't.

Author: Matei Zaharia 

Closes #8220 from mateiz/shuffle-loc-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf016075
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf016075
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf016075

Branch: refs/heads/master
Commit: cf016075a006034c24c5b758edb279f3e151d25d
Parents: 5f9ce73
Author: Matei Zaharia 
Authored: Sun Aug 16 00:34:58 2015 -0700
Committer: Matei Zaharia 
Committed: Sun Aug 16 00:34:58 2015 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 37 +++-
 .../spark/scheduler/DAGSchedulerSuite.scala | 26 --
 2 files changed, 44 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cf016075/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1c63d0..dadf83a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1383,33 +1383,36 @@ class DAGScheduler(
   return rddPrefs.map(TaskLocation(_))
 }
 
+// If the RDD has narrow dependencies, pick the first partition of the 
first narrow dependency
+// that has any placement preferences. Ideally we would choose based on 
transfer sizes,
+// but this will do for now.
 rdd.dependencies.foreach {
   case n: NarrowDependency[_] =>
-// If the RDD has narrow dependencies, pick the first partition of the 
first narrow dep
-// that has any placement preferences. Ideally we would choose based 
on transfer sizes,
-// but this will do for now.
 for (inPart <- n.getParents(partition)) {
   val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
   if (locs != Nil) {
 return locs
   }
 }
-  case s: ShuffleDependency[_, _, _] =>
-// For shuffle dependencies, pick locations which have at least 
REDUCER_PREF_LOCS_FRACTION
-// of data as preferred locations
-if (shuffleLocalityEnabled &&
-rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
-s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
-  // Get the preferred map output locations for this reducer
-  val topLocsForReducer = 
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
-partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
-  if (topLocsForReducer.nonEmpty) {
-return topLocsForReducer.get.map(loc => TaskLocation(loc.host, 
loc.executorId))
-  }
-}
-
   case _ =>
 }
+
+// If the RDD has shuffle dependencies and shuffle locality is enabled, 
pick locations that
+// have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
+if (shuffleLocalityEnabled && rdd.partitions.length < 
SHUFFLE_PREF_REDUCE_THRESHOLD) {
+  rdd.dependencies.foreach {
+case s: ShuffleDependency[_, _, _] =>
+  if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
+// Get the preferred map output locations for this reducer
+val topLocsForReducer = 
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+  partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
+if (topLocsForReducer.nonEmpty) {
+  return topLocsForReducer.get.map(loc => TaskLocation(loc.host, 
loc.executorId))
+}
+  }
+case _ =>
+  }
+}
 Nil
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cf016075/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b0ca49c..a063596 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -926,7 +9

spark git commit: [SPARK-8844] [SPARKR] head/collect is broken in SparkR.

2015-08-16 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 182f9b7a6 -> 5f9ce738f


[SPARK-8844] [SPARKR] head/collect is broken in SparkR.

This is a WIP patch for SPARK-8844  for collecting reviews.

This bug is about reading an empty DataFrame. in readCol(),
  lapply(1:numRows, function(x) {
does not take into consideration the case where numRows = 0.

Will add unit test case.

Author: Sun Rui 

Closes #7419 from sun-rui/SPARK-8844.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f9ce738
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f9ce738
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f9ce738

Branch: refs/heads/master
Commit: 5f9ce738fe6bab3f0caffad0df1d3876178cf469
Parents: 182f9b7
Author: Sun Rui 
Authored: Sun Aug 16 00:30:02 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Sun Aug 16 00:30:02 2015 -0700

--
 R/pkg/R/deserialize.R| 16 ++--
 R/pkg/inst/tests/test_sparkSQL.R | 20 
 2 files changed, 30 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5f9ce738/R/pkg/R/deserialize.R
--
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
index 6d364f7..33bf13e 100644
--- a/R/pkg/R/deserialize.R
+++ b/R/pkg/R/deserialize.R
@@ -176,10 +176,14 @@ readRow <- function(inputCon) {
 
 # Take a single column as Array[Byte] and deserialize it into an atomic vector
 readCol <- function(inputCon, numRows) {
-  # sapply can not work with POSIXlt
-  do.call(c, lapply(1:numRows, function(x) {
-value <- readObject(inputCon)
-# Replace NULL with NA so we can coerce to vectors
-if (is.null(value)) NA else value
-  }))
+  if (numRows > 0) {
+# sapply can not work with POSIXlt
+do.call(c, lapply(1:numRows, function(x) {
+  value <- readObject(inputCon)
+  # Replace NULL with NA so we can coerce to vectors
+  if (is.null(value)) NA else value
+}))
+  } else {
+vector()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5f9ce738/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index e6d3b21..c77f633 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -408,6 +408,14 @@ test_that("collect() returns a data.frame", {
   expect_equal(names(rdf)[1], "age")
   expect_equal(nrow(rdf), 3)
   expect_equal(ncol(rdf), 2)
+
+  # collect() returns data correctly from a DataFrame with 0 row
+  df0 <- limit(df, 0)
+  rdf <- collect(df0)
+  expect_true(is.data.frame(rdf))
+  expect_equal(names(rdf)[1], "age")
+  expect_equal(nrow(rdf), 0)
+  expect_equal(ncol(rdf), 2)
 })
 
 test_that("limit() returns DataFrame with the correct number of rows", {
@@ -492,6 +500,18 @@ test_that("head() and first() return the correct data", {
 
   testFirst <- first(df)
   expect_equal(nrow(testFirst), 1)
+
+  # head() and first() return the correct data on
+  # a DataFrame with 0 row
+  df0 <- limit(df, 0)
+
+  testHead <- head(df0)
+  expect_equal(nrow(testHead), 0)
+  expect_equal(ncol(testHead), 2)
+
+  testFirst <- first(df0)
+  expect_equal(nrow(testFirst), 0)
+  expect_equal(ncol(testFirst), 2)
 })
 
 test_that("distinct() and unique on DataFrames", {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8844] [SPARKR] head/collect is broken in SparkR.

2015-08-16 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 881baf100 -> 4f75ce2e1


[SPARK-8844] [SPARKR] head/collect is broken in SparkR.

This is a WIP patch for SPARK-8844  for collecting reviews.

This bug is about reading an empty DataFrame. in readCol(),
  lapply(1:numRows, function(x) {
does not take into consideration the case where numRows = 0.

Will add unit test case.

Author: Sun Rui 

Closes #7419 from sun-rui/SPARK-8844.

(cherry picked from commit 5f9ce738fe6bab3f0caffad0df1d3876178cf469)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f75ce2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f75ce2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f75ce2e

Branch: refs/heads/branch-1.5
Commit: 4f75ce2e193c813f4e3ad067749b6e7b4f0ee135
Parents: 881baf1
Author: Sun Rui 
Authored: Sun Aug 16 00:30:02 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Sun Aug 16 00:30:10 2015 -0700

--
 R/pkg/R/deserialize.R| 16 ++--
 R/pkg/inst/tests/test_sparkSQL.R | 20 
 2 files changed, 30 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f75ce2e/R/pkg/R/deserialize.R
--
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
index 6d364f7..33bf13e 100644
--- a/R/pkg/R/deserialize.R
+++ b/R/pkg/R/deserialize.R
@@ -176,10 +176,14 @@ readRow <- function(inputCon) {
 
 # Take a single column as Array[Byte] and deserialize it into an atomic vector
 readCol <- function(inputCon, numRows) {
-  # sapply can not work with POSIXlt
-  do.call(c, lapply(1:numRows, function(x) {
-value <- readObject(inputCon)
-# Replace NULL with NA so we can coerce to vectors
-if (is.null(value)) NA else value
-  }))
+  if (numRows > 0) {
+# sapply can not work with POSIXlt
+do.call(c, lapply(1:numRows, function(x) {
+  value <- readObject(inputCon)
+  # Replace NULL with NA so we can coerce to vectors
+  if (is.null(value)) NA else value
+}))
+  } else {
+vector()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f75ce2e/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index e6d3b21..c77f633 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -408,6 +408,14 @@ test_that("collect() returns a data.frame", {
   expect_equal(names(rdf)[1], "age")
   expect_equal(nrow(rdf), 3)
   expect_equal(ncol(rdf), 2)
+
+  # collect() returns data correctly from a DataFrame with 0 row
+  df0 <- limit(df, 0)
+  rdf <- collect(df0)
+  expect_true(is.data.frame(rdf))
+  expect_equal(names(rdf)[1], "age")
+  expect_equal(nrow(rdf), 0)
+  expect_equal(ncol(rdf), 2)
 })
 
 test_that("limit() returns DataFrame with the correct number of rows", {
@@ -492,6 +500,18 @@ test_that("head() and first() return the correct data", {
 
   testFirst <- first(df)
   expect_equal(nrow(testFirst), 1)
+
+  # head() and first() return the correct data on
+  # a DataFrame with 0 row
+  df0 <- limit(df, 0)
+
+  testHead <- head(df0)
+  expect_equal(nrow(testHead), 0)
+  expect_equal(ncol(testHead), 2)
+
+  testFirst <- first(df0)
+  expect_equal(nrow(testFirst), 0)
+  expect_equal(ncol(testFirst), 2)
 })
 
 test_that("distinct() and unique on DataFrames", {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org