spark git commit: [SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 6a996b362 -> 7b6f3a118


[SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the 
same shorten name

## What changes were proposed in this pull request?

One of the common usability problems around reading data in spark (particularly 
CSV) is that there can often be a conflict between different readers in the 
classpath.

As an example, if someone launches a 2.x spark shell with the spark-csv package 
in the classpath, Spark currently fails in an extremely unfriendly way (see 
databricks/spark-csv#367):

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, 
com.databricks.spark.csv.DefaultSource15), please specify the fully qualified 
class name.
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
  at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
  at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided
```

This PR proposes a simple way of fixing this error by picking up the internal 
datasource if there is single (the datasource that has "org.apache.spark" 
prefix).

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

## How was this patch tested?

Manually tested as below:

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
```

```scala
spark.sparkContext.setLogLevel("WARN")
```

**positive cases**:

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

(newlines were inserted for readability).

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
```

```scala
scala> 
spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")
```

**negative cases**:

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
```

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source: 
com.databricks.spark.csv.CsvRelatio. Please find packages at 
http://spark.apache.org/third-party-projects.html
...
```

Author: hyukjinkwon 

Closes #17916 from HyukjinKwon/datasource-detect.

(cherry picked from commit 3d2131ab4ddead29601fb3c597b798202ac25fdd)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b6f3a11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b6f3a11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b6f3a11

Branch: refs/heads/branch-2.2
Commit: 7b6f3a118e973216264bbf356af2bb1e7870466e
Parents: 6a996b3
Author: hyukjinkwon 
Authored: Wed May 10 13:44:47 2017 +0800
Committer: Wenchen Fan 
Committed: Wed May 10 13:45:07 2017 +0800

spark git commit: [SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 771abeb46 -> 3d2131ab4


[SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the 
same shorten name

## What changes were proposed in this pull request?

One of the common usability problems around reading data in spark (particularly 
CSV) is that there can often be a conflict between different readers in the 
classpath.

As an example, if someone launches a 2.x spark shell with the spark-csv package 
in the classpath, Spark currently fails in an extremely unfriendly way (see 
databricks/spark-csv#367):

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, 
com.databricks.spark.csv.DefaultSource15), please specify the fully qualified 
class name.
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
  at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
  at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided
```

This PR proposes a simple way of fixing this error by picking up the internal 
datasource if there is single (the datasource that has "org.apache.spark" 
prefix).

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

## How was this patch tested?

Manually tested as below:

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
```

```scala
spark.sparkContext.setLogLevel("WARN")
```

**positive cases**:

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

(newlines were inserted for readability).

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
```

```scala
scala> 
spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")
```

**negative cases**:

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
```

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source: 
com.databricks.spark.csv.CsvRelatio. Please find packages at 
http://spark.apache.org/third-party-projects.html
...
```

Author: hyukjinkwon 

Closes #17916 from HyukjinKwon/datasource-detect.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d2131ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d2131ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d2131ab

Branch: refs/heads/master
Commit: 3d2131ab4ddead29601fb3c597b798202ac25fdd
Parents: 771abeb
Author: hyukjinkwon 
Authored: Wed May 10 13:44:47 2017 +0800
Committer: Wenchen Fan 
Committed: Wed May 10 13:44:47 2017 +0800

--
 .../sql/execution/datasources/DataSource.scala  | 19 

spark git commit: [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey

2017-05-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 12c937ede -> 50f28dfe4


[SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling 
createJoinKey

## What changes were proposed in this pull request?

The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 
1310720`:
```sql
CREATE TABLE tab1(int int, int2 int, str string);
CREATE TABLE tab2(int int, int2 int, str string);
INSERT INTO tab1 values(1,1,'str');
INSERT INTO tab1 values(2,2,'str');
INSERT INTO tab2 values(1,1,'str');
INSERT INTO tab2 values(2,3,'str');

SELECT
  count(*)
FROM
  (
SELECT t1.int, t2.int2
FROM (SELECT * FROM tab1 LIMIT 1310721) t1
INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2
ON (t1.int = t2.int AND t1.int2 = t2.int2)
  ) t;
```

This pull request fix this issue.

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #17920 from wangyum/SPARK-17685.

(cherry picked from commit 771abeb46f637592aba2e63db2ed05b6cabfd0be)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50f28dfe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50f28dfe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50f28dfe

Branch: refs/heads/branch-2.1
Commit: 50f28dfe43410dadabbecc62e34fde8bacc8aee6
Parents: 12c937e
Author: Yuming Wang 
Authored: Tue May 9 19:45:00 2017 -0700
Committer: Herman van Hovell 
Committed: Tue May 9 19:45:22 2017 -0700

--
 .../spark/sql/execution/joins/SortMergeJoinExec.scala |  1 +
 .../scala/org/apache/spark/sql/DataFrameJoinSuite.scala   | 10 ++
 2 files changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/50f28dfe/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index a1f9416..89a9b38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -342,6 +342,7 @@ case class SortMergeJoinExec(
   keys: Seq[Expression],
   input: Seq[Attribute]): Seq[ExprCode] = {
 ctx.INPUT_ROW = row
+ctx.currentVars = null
 keys.map(BindReferences.bindReference(_, input).genCode(ctx))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/50f28dfe/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 541ffb5..9383e83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -248,4 +248,14 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 val ab = a.join(b, Seq("a"), "fullouter")
 checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
   }
+
+  test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") {
+val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str")
+val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str")
+val limit = 1310721
+val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), 
"inner")
+  .agg(count($"int"))
+checkAnswer(innerJoin, Row(1) :: Nil)
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey

2017-05-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 7600a7ab6 -> 6a996b362


[SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling 
createJoinKey

## What changes were proposed in this pull request?

The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 
1310720`:
```sql
CREATE TABLE tab1(int int, int2 int, str string);
CREATE TABLE tab2(int int, int2 int, str string);
INSERT INTO tab1 values(1,1,'str');
INSERT INTO tab1 values(2,2,'str');
INSERT INTO tab2 values(1,1,'str');
INSERT INTO tab2 values(2,3,'str');

SELECT
  count(*)
FROM
  (
SELECT t1.int, t2.int2
FROM (SELECT * FROM tab1 LIMIT 1310721) t1
INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2
ON (t1.int = t2.int AND t1.int2 = t2.int2)
  ) t;
```

This pull request fix this issue.

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #17920 from wangyum/SPARK-17685.

(cherry picked from commit 771abeb46f637592aba2e63db2ed05b6cabfd0be)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a996b36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a996b36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a996b36

Branch: refs/heads/branch-2.2
Commit: 6a996b36283dcd22ff7aa38968a80f575d2f151e
Parents: 7600a7a
Author: Yuming Wang 
Authored: Tue May 9 19:45:00 2017 -0700
Committer: Herman van Hovell 
Committed: Tue May 9 19:45:09 2017 -0700

--
 .../spark/sql/execution/joins/SortMergeJoinExec.scala |  1 +
 .../scala/org/apache/spark/sql/DataFrameJoinSuite.scala   | 10 ++
 2 files changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a996b36/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index c6aae1a..26fb610 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -371,6 +371,7 @@ case class SortMergeJoinExec(
   keys: Seq[Expression],
   input: Seq[Attribute]): Seq[ExprCode] = {
 ctx.INPUT_ROW = row
+ctx.currentVars = null
 keys.map(BindReferences.bindReference(_, input).genCode(ctx))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a996b36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4a52af6..aef0d7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -264,4 +264,14 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 val ab = a.join(b, Seq("a"), "fullouter")
 checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
   }
+
+  test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") {
+val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str")
+val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str")
+val limit = 1310721
+val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), 
"inner")
+  .agg(count($"int"))
+checkAnswer(innerJoin, Row(1) :: Nil)
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey

2017-05-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master c0189abc7 -> 771abeb46


[SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling 
createJoinKey

## What changes were proposed in this pull request?

The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 
1310720`:
```sql
CREATE TABLE tab1(int int, int2 int, str string);
CREATE TABLE tab2(int int, int2 int, str string);
INSERT INTO tab1 values(1,1,'str');
INSERT INTO tab1 values(2,2,'str');
INSERT INTO tab2 values(1,1,'str');
INSERT INTO tab2 values(2,3,'str');

SELECT
  count(*)
FROM
  (
SELECT t1.int, t2.int2
FROM (SELECT * FROM tab1 LIMIT 1310721) t1
INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2
ON (t1.int = t2.int AND t1.int2 = t2.int2)
  ) t;
```

This pull request fix this issue.

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #17920 from wangyum/SPARK-17685.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/771abeb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/771abeb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/771abeb4

Branch: refs/heads/master
Commit: 771abeb46f637592aba2e63db2ed05b6cabfd0be
Parents: c0189ab
Author: Yuming Wang 
Authored: Tue May 9 19:45:00 2017 -0700
Committer: Herman van Hovell 
Committed: Tue May 9 19:45:00 2017 -0700

--
 .../spark/sql/execution/joins/SortMergeJoinExec.scala |  1 +
 .../scala/org/apache/spark/sql/DataFrameJoinSuite.scala   | 10 ++
 2 files changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/771abeb4/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index c6aae1a..26fb610 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -371,6 +371,7 @@ case class SortMergeJoinExec(
   keys: Seq[Expression],
   input: Seq[Attribute]): Seq[ExprCode] = {
 ctx.INPUT_ROW = row
+ctx.currentVars = null
 keys.map(BindReferences.bindReference(_, input).genCode(ctx))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/771abeb4/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4a52af6..aef0d7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -264,4 +264,14 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 val ab = a.join(b, Seq("a"), "fullouter")
 checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
   }
+
+  test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") {
+val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str")
+val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str")
+val limit = 1310721
+val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), 
"inner")
+  .agg(count($"int"))
+checkAnswer(innerJoin, Row(1) :: Nil)
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute

2017-05-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 d191b962d -> 7600a7ab6


[SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` 
does not execute

## What changes were proposed in this pull request?

Any Dataset/DataFrame batch query with the operation `withWatermark` does not 
execute because the batch planner does not have any rule to explicitly handle 
the EventTimeWatermark logical plan.
The right solution is to simply remove the plan node, as the watermark should 
not affect any batch query in any way.

Changes:
- In this PR, we add a new rule `EliminateEventTimeWatermark` to check if we 
need to ignore the event time watermark. We will ignore watermark in any batch 
query.

Depends upon:
- [SPARK-20672](https://issues.apache.org/jira/browse/SPARK-20672). We can not 
add this rule into analyzer directly, because streaming query will be copied to 
`triggerLogicalPlan ` in every trigger, and the rule will be applied to 
`triggerLogicalPlan` mistakenly.

Others:
- A typo fix in example.

## How was this patch tested?

add new unit test.

Author: uncleGen 

Closes #17896 from uncleGen/SPARK-20373.

(cherry picked from commit c0189abc7c6ddbecc1832d2ff0cfc5546a010b60)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7600a7ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7600a7ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7600a7ab

Branch: refs/heads/branch-2.2
Commit: 7600a7ab65777a59f3a33edef40328b6a5d864ef
Parents: d191b96
Author: uncleGen 
Authored: Tue May 9 15:08:09 2017 -0700
Committer: Shixiong Zhu 
Committed: Tue May 9 15:08:38 2017 -0700

--
 docs/structured-streaming-programming-guide.md|  3 +++
 .../examples/sql/streaming/StructuredSessionization.scala |  4 ++--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala |  3 ++-
 .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 10 ++
 5 files changed, 27 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7600a7ab/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 53b3db2..bd01be9 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -901,6 +901,9 @@ Some sinks (e.g. files) may not supported fine-grained 
updates that Update Mode
 with them, we have also support Append Mode, where only the *final counts* are 
written to sink.
 This is illustrated below.
 
+Note that using `withWatermark` on a non-streaming Dataset is no-op. As the 
watermark should not affect 
+any batch query in any way, we will ignore it directly.
+
 ![Watermarking in Append 
Mode](img/structured-streaming-watermark-append-mode.png)
 
 Similar to the Update Mode earlier, the engine maintains intermediate counts 
for each window. 

http://git-wip-us.apache.org/repos/asf/spark/blob/7600a7ab/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
index 2ce792c..ed63fb6 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
@@ -34,14 +34,14 @@ import org.apache.spark.sql.streaming._
  * To run this on your local machine, you need to first run a Netcat server
  * `$ nc -lk `
  * and then run the example
- * `$ bin/run-example sql.streaming.StructuredNetworkWordCount
+ * `$ bin/run-example sql.streaming.StructuredSessionization
  * localhost `
  */
 object StructuredSessionization {
 
   def main(args: Array[String]): Unit = {
 if (args.length < 2) {
-  System.err.println("Usage: StructuredNetworkWordCount  ")
+  System.err.println("Usage: StructuredSessionization  ")
   System.exit(1)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7600a7ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 

spark git commit: [SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute

2017-05-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f79aa285c -> c0189abc7


[SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` 
does not execute

## What changes were proposed in this pull request?

Any Dataset/DataFrame batch query with the operation `withWatermark` does not 
execute because the batch planner does not have any rule to explicitly handle 
the EventTimeWatermark logical plan.
The right solution is to simply remove the plan node, as the watermark should 
not affect any batch query in any way.

Changes:
- In this PR, we add a new rule `EliminateEventTimeWatermark` to check if we 
need to ignore the event time watermark. We will ignore watermark in any batch 
query.

Depends upon:
- [SPARK-20672](https://issues.apache.org/jira/browse/SPARK-20672). We can not 
add this rule into analyzer directly, because streaming query will be copied to 
`triggerLogicalPlan ` in every trigger, and the rule will be applied to 
`triggerLogicalPlan` mistakenly.

Others:
- A typo fix in example.

## How was this patch tested?

add new unit test.

Author: uncleGen 

Closes #17896 from uncleGen/SPARK-20373.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0189abc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0189abc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0189abc

Branch: refs/heads/master
Commit: c0189abc7c6ddbecc1832d2ff0cfc5546a010b60
Parents: f79aa28
Author: uncleGen 
Authored: Tue May 9 15:08:09 2017 -0700
Committer: Shixiong Zhu 
Committed: Tue May 9 15:08:09 2017 -0700

--
 docs/structured-streaming-programming-guide.md|  3 +++
 .../examples/sql/streaming/StructuredSessionization.scala |  4 ++--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala |  3 ++-
 .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 10 ++
 5 files changed, 27 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 53b3db2..bd01be9 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -901,6 +901,9 @@ Some sinks (e.g. files) may not supported fine-grained 
updates that Update Mode
 with them, we have also support Append Mode, where only the *final counts* are 
written to sink.
 This is illustrated below.
 
+Note that using `withWatermark` on a non-streaming Dataset is no-op. As the 
watermark should not affect 
+any batch query in any way, we will ignore it directly.
+
 ![Watermarking in Append 
Mode](img/structured-streaming-watermark-append-mode.png)
 
 Similar to the Update Mode earlier, the engine maintains intermediate counts 
for each window. 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
index 2ce792c..ed63fb6 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
@@ -34,14 +34,14 @@ import org.apache.spark.sql.streaming._
  * To run this on your local machine, you need to first run a Netcat server
  * `$ nc -lk `
  * and then run the example
- * `$ bin/run-example sql.streaming.StructuredNetworkWordCount
+ * `$ bin/run-example sql.streaming.StructuredSessionization
  * localhost `
  */
 object StructuredSessionization {
 
   def main(args: Array[String]): Unit = {
 if (args.length < 2) {
-  System.err.println("Usage: StructuredNetworkWordCount  ")
+  System.err.println("Usage: StructuredSessionization  ")
   System.exit(1)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 72e7d5d..c56dd36 100644
--- 

spark git commit: Revert "[SPARK-20311][SQL] Support aliases for table value functions"

2017-05-09 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 9e8d23b3a -> d191b962d


Revert "[SPARK-20311][SQL] Support aliases for table value functions"

This reverts commit 714811d0b5bcb5d47c39782ff74f898d276ecc59.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d191b962
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d191b962
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d191b962

Branch: refs/heads/branch-2.2
Commit: d191b962dc81c015fa92a38d882a8c7ea620ef06
Parents: 9e8d23b
Author: Yin Huai 
Authored: Tue May 9 14:47:45 2017 -0700
Committer: Yin Huai 
Committed: Tue May 9 14:49:02 2017 -0700

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 ++
 .../analysis/ResolveTableValuedFunctions.scala  | 22 +++-
 .../sql/catalyst/analysis/unresolved.scala  | 10 ++---
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 17 ---
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 +
 .../sql/catalyst/parser/PlanParserSuite.scala   | 13 +---
 6 files changed, 17 insertions(+), 79 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d191b962/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 15e4dd4..1ecb3d1 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -472,23 +472,15 @@ identifierComment
 ;
 
 relationPrimary
-: tableIdentifier sample? (AS? strictIdentifier)?  #tableName
-| '(' queryNoWith ')' sample? (AS? strictIdentifier)?  #aliasedQuery
-| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation
-| inlineTable  #inlineTableDefault2
-| functionTable#tableValuedFunction
+: tableIdentifier sample? (AS? strictIdentifier)?   #tableName
+| '(' queryNoWith ')' sample? (AS? strictIdentifier)?   
#aliasedQuery
+| '(' relation ')' sample? (AS? strictIdentifier)?  
#aliasedRelation
+| inlineTable   
#inlineTableDefault2
+| identifier '(' (expression (',' expression)*)? ')'
#tableValuedFunction
 ;
 
 inlineTable
-: VALUES expression (',' expression)*  tableAlias
-;
-
-functionTable
-: identifier '(' (expression (',' expression)*)? ')' tableAlias
-;
-
-tableAlias
-: (AS? identifier identifierList?)?
+: VALUES expression (',' expression)*  (AS? identifier identifierList?)?
 ;
 
 rowFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/d191b962/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index dad1340..de6de24 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Range}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.{DataType, IntegerType, LongType}
 
@@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
 case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
-  val resolvedFunc = 
builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
+  builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
 case Some(tvf) =>
   val resolved = tvf.flatMap { case (argList, resolver) =>
 argList.implicitCast(u.functionArgs) match {
@@ -125,21 +125,5 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] 

spark git commit: Revert "[SPARK-20311][SQL] Support aliases for table value functions"

2017-05-09 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master ac1ab6b9d -> f79aa285c


Revert "[SPARK-20311][SQL] Support aliases for table value functions"

This reverts commit 714811d0b5bcb5d47c39782ff74f898d276ecc59.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f79aa285
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f79aa285
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f79aa285

Branch: refs/heads/master
Commit: f79aa285cf115963ba06a9cacb3dbd7e3cbf7728
Parents: ac1ab6b
Author: Yin Huai 
Authored: Tue May 9 14:47:45 2017 -0700
Committer: Yin Huai 
Committed: Tue May 9 14:47:45 2017 -0700

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 ++
 .../analysis/ResolveTableValuedFunctions.scala  | 22 +++-
 .../sql/catalyst/analysis/unresolved.scala  | 10 ++---
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 17 ---
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 +
 .../sql/catalyst/parser/PlanParserSuite.scala   | 13 +---
 6 files changed, 17 insertions(+), 79 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f79aa285/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 41daf58..14c511f 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -472,23 +472,15 @@ identifierComment
 ;
 
 relationPrimary
-: tableIdentifier sample? (AS? strictIdentifier)?  #tableName
-| '(' queryNoWith ')' sample? (AS? strictIdentifier)?  #aliasedQuery
-| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation
-| inlineTable  #inlineTableDefault2
-| functionTable#tableValuedFunction
+: tableIdentifier sample? (AS? strictIdentifier)?   #tableName
+| '(' queryNoWith ')' sample? (AS? strictIdentifier)?   
#aliasedQuery
+| '(' relation ')' sample? (AS? strictIdentifier)?  
#aliasedRelation
+| inlineTable   
#inlineTableDefault2
+| identifier '(' (expression (',' expression)*)? ')'
#tableValuedFunction
 ;
 
 inlineTable
-: VALUES expression (',' expression)*  tableAlias
-;
-
-functionTable
-: identifier '(' (expression (',' expression)*)? ')' tableAlias
-;
-
-tableAlias
-: (AS? identifier identifierList?)?
+: VALUES expression (',' expression)*  (AS? identifier identifierList?)?
 ;
 
 rowFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/f79aa285/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index dad1340..de6de24 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Range}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.{DataType, IntegerType, LongType}
 
@@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
 case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
-  val resolvedFunc = 
builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
+  builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
 case Some(tvf) =>
   val resolved = tvf.flatMap { case (argList, resolver) =>
 argList.implicitCast(u.functionArgs) match {
@@ -125,21 +125,5 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
   

spark git commit: Revert "[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps"

2017-05-09 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 1b85bcd92 -> ac1ab6b9d


Revert "[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps"

This reverts commit 22691556e5f0dfbac81b8cc9ca0a67c70c1711ca.

See JIRA ticket for more information.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac1ab6b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac1ab6b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac1ab6b9

Branch: refs/heads/master
Commit: ac1ab6b9db188ac54c745558d57dd0a031d0b162
Parents: 1b85bcd
Author: Reynold Xin 
Authored: Tue May 9 11:35:59 2017 -0700
Committer: Reynold Xin 
Committed: Tue May 9 11:35:59 2017 -0700

--
 .../spark/sql/catalyst/catalog/interface.scala  |   4 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala |   5 -
 .../parquet/VectorizedColumnReader.java |  28 +-
 .../parquet/VectorizedParquetRecordReader.java  |   6 +-
 .../spark/sql/execution/command/tables.scala|   8 +-
 .../datasources/parquet/ParquetFileFormat.scala |   2 -
 .../parquet/ParquetReadSupport.scala|   3 +-
 .../parquet/ParquetRecordMaterializer.scala |   9 +-
 .../parquet/ParquetRowConverter.scala   |  53 +--
 .../parquet/ParquetWriteSupport.scala   |  25 +-
 .../spark/sql/hive/HiveExternalCatalog.scala|  11 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  12 +-
 .../hive/ParquetHiveCompatibilitySuite.scala| 379 +--
 13 files changed, 29 insertions(+), 516 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index c39017e..cc0cbba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -132,10 +132,10 @@ case class CatalogTablePartition(
   /**
* Given the partition schema, returns a row with that schema holding the 
partition values.
*/
-  def toRow(partitionSchema: StructType, defaultTimeZoneId: String): 
InternalRow = {
+  def toRow(partitionSchema: StructType, defaultTimeZondId: String): 
InternalRow = {
 val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
 val timeZoneId = caseInsensitiveProperties.getOrElse(
-  DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
+  DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
 InternalRow.fromSeq(partitionSchema.map { field =>
   val partValue = if (spec(field.name) == 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
 null

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index bf596fa..6c1592f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -498,11 +498,6 @@ object DateTimeUtils {
 false
   }
 
-  lazy val validTimezones = TimeZone.getAvailableIDs().toSet
-  def isValidTimezone(timezoneId: String): Boolean = {
-validTimezones.contains(timezoneId)
-  }
-
   /**
* Returns the microseconds since year zero (-17999) from microseconds since 
epoch.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index dabbc2b..9d641b5 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,9 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
-import java.util.TimeZone;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.BytesUtils;
 import 

spark git commit: [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version

2017-05-09 Thread holden
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c7bd909f6 -> 9e8d23b3a


[SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version

## What changes were proposed in this pull request?

Drop the hadoop distirbution name from the Python version (PEP440 - 
https://www.python.org/dev/peps/pep-0440/). We've been using the local version 
string to disambiguate between different hadoop versions packaged with PySpark, 
but PEP0440 states that local versions should not be used when publishing 
up-stream. Since we no longer make PySpark pip packages for different hadoop 
versions, we can simply drop the hadoop information. If at a later point we 
need to start publishing different hadoop versions we can look at make 
different packages or similar.

## How was this patch tested?

Ran `make-distribution` locally

Author: Holden Karau 

Closes #17885 from holdenk/SPARK-20627-remove-pip-local-version-string.

(cherry picked from commit 1b85bcd9298cf84dd746fe8e91ab0b0df69ef17e)
Signed-off-by: Holden Karau 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e8d23b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e8d23b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e8d23b3

Branch: refs/heads/branch-2.2
Commit: 9e8d23b3a2f99985ffb3c4eb67ac0a2774fa5b02
Parents: c7bd909
Author: Holden Karau 
Authored: Tue May 9 11:25:29 2017 -0700
Committer: Holden Karau 
Committed: Tue May 9 11:26:00 2017 -0700

--
 dev/create-release/release-build.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9e8d23b3/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index 7976d8a..a72307a 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -163,9 +163,9 @@ if [[ "$1" == "package" ]]; then
 export ZINC_PORT=$ZINC_PORT
 echo "Creating distribution: $NAME ($FLAGS)"
 
-# Write out the NAME and VERSION to PySpark version info we rewrite the - 
into a . and SNAPSHOT
-# to dev0 to be closer to PEP440. We use the NAME as a "local version".
-PYSPARK_VERSION=`echo "$SPARK_VERSION+$NAME" |  sed -r "s/-/./" | sed -r 
"s/SNAPSHOT/dev0/"`
+# Write out the VERSION to PySpark version info we rewrite the - into a . 
and SNAPSHOT
+# to dev0 to be closer to PEP440.
+PYSPARK_VERSION=`echo "$SPARK_VERSION" |  sed -r "s/-/./" | sed -r 
"s/SNAPSHOT/dev0/"`
 echo "__version__='$PYSPARK_VERSION'" > python/pyspark/version.py
 
 # Get maven home set by MVN


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version

2017-05-09 Thread holden
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 f7a91a17e -> 12c937ede


[SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version

## What changes were proposed in this pull request?

Drop the hadoop distirbution name from the Python version (PEP440 - 
https://www.python.org/dev/peps/pep-0440/). We've been using the local version 
string to disambiguate between different hadoop versions packaged with PySpark, 
but PEP0440 states that local versions should not be used when publishing 
up-stream. Since we no longer make PySpark pip packages for different hadoop 
versions, we can simply drop the hadoop information. If at a later point we 
need to start publishing different hadoop versions we can look at make 
different packages or similar.

## How was this patch tested?

Ran `make-distribution` locally

Author: Holden Karau 

Closes #17885 from holdenk/SPARK-20627-remove-pip-local-version-string.

(cherry picked from commit 1b85bcd9298cf84dd746fe8e91ab0b0df69ef17e)
Signed-off-by: Holden Karau 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12c937ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12c937ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12c937ed

Branch: refs/heads/branch-2.1
Commit: 12c937ede309d6886ce1ceadff2691f31dcdd6d3
Parents: f7a91a1
Author: Holden Karau 
Authored: Tue May 9 11:25:29 2017 -0700
Committer: Holden Karau 
Committed: Tue May 9 11:26:25 2017 -0700

--
 dev/create-release/release-build.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/12c937ed/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index ab17f2f..c4ddc21 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -163,9 +163,9 @@ if [[ "$1" == "package" ]]; then
 export ZINC_PORT=$ZINC_PORT
 echo "Creating distribution: $NAME ($FLAGS)"
 
-# Write out the NAME and VERSION to PySpark version info we rewrite the - 
into a . and SNAPSHOT
-# to dev0 to be closer to PEP440. We use the NAME as a "local version".
-PYSPARK_VERSION=`echo "$SPARK_VERSION+$NAME" |  sed -r "s/-/./" | sed -r 
"s/SNAPSHOT/dev0/"`
+# Write out the VERSION to PySpark version info we rewrite the - into a . 
and SNAPSHOT
+# to dev0 to be closer to PEP440.
+PYSPARK_VERSION=`echo "$SPARK_VERSION" |  sed -r "s/-/./" | sed -r 
"s/SNAPSHOT/dev0/"`
 echo "__version__='$PYSPARK_VERSION'" > python/pyspark/version.py
 
 # Get maven home set by MVN


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version

2017-05-09 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master 25ee816e0 -> 1b85bcd92


[SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version

## What changes were proposed in this pull request?

Drop the hadoop distirbution name from the Python version (PEP440 - 
https://www.python.org/dev/peps/pep-0440/). We've been using the local version 
string to disambiguate between different hadoop versions packaged with PySpark, 
but PEP0440 states that local versions should not be used when publishing 
up-stream. Since we no longer make PySpark pip packages for different hadoop 
versions, we can simply drop the hadoop information. If at a later point we 
need to start publishing different hadoop versions we can look at make 
different packages or similar.

## How was this patch tested?

Ran `make-distribution` locally

Author: Holden Karau 

Closes #17885 from holdenk/SPARK-20627-remove-pip-local-version-string.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b85bcd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b85bcd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b85bcd9

Branch: refs/heads/master
Commit: 1b85bcd9298cf84dd746fe8e91ab0b0df69ef17e
Parents: 25ee816
Author: Holden Karau 
Authored: Tue May 9 11:25:29 2017 -0700
Committer: Holden Karau 
Committed: Tue May 9 11:25:29 2017 -0700

--
 dev/create-release/release-build.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1b85bcd9/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index 7976d8a..a72307a 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -163,9 +163,9 @@ if [[ "$1" == "package" ]]; then
 export ZINC_PORT=$ZINC_PORT
 echo "Creating distribution: $NAME ($FLAGS)"
 
-# Write out the NAME and VERSION to PySpark version info we rewrite the - 
into a . and SNAPSHOT
-# to dev0 to be closer to PEP440. We use the NAME as a "local version".
-PYSPARK_VERSION=`echo "$SPARK_VERSION+$NAME" |  sed -r "s/-/./" | sed -r 
"s/SNAPSHOT/dev0/"`
+# Write out the VERSION to PySpark version info we rewrite the - into a . 
and SNAPSHOT
+# to dev0 to be closer to PEP440.
+PYSPARK_VERSION=`echo "$SPARK_VERSION" |  sed -r "s/-/./" | sed -r 
"s/SNAPSHOT/dev0/"`
 echo "__version__='$PYSPARK_VERSION'" > python/pyspark/version.py
 
 # Get maven home set by MVN


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19876][BUILD] Move Trigger.java to java source hierarchy

2017-05-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 73aa23b8e -> c7bd909f6


[SPARK-19876][BUILD] Move Trigger.java to java source hierarchy

## What changes were proposed in this pull request?

Simply moves `Trigger.java` to `src/main/java` from `src/main/scala`
See https://github.com/apache/spark/pull/17219

## How was this patch tested?

Existing tests.

Author: Sean Owen 

Closes #17921 from srowen/SPARK-19876.2.

(cherry picked from commit 25ee816e090c42f0e35be2d2cb0f8ec60726317c)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7bd909f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7bd909f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7bd909f

Branch: refs/heads/branch-2.2
Commit: c7bd909f67209b4d1354c3d5b0a0fb1d4e28f205
Parents: 73aa23b
Author: Sean Owen 
Authored: Tue May 9 10:22:23 2017 -0700
Committer: Herman van Hovell 
Committed: Tue May 9 10:22:32 2017 -0700

--
 .../org/apache/spark/sql/streaming/Trigger.java | 105 +++
 .../org/apache/spark/sql/streaming/Trigger.java | 105 ---
 2 files changed, 105 insertions(+), 105 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c7bd909f/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
--
diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java 
b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 000..3e3997f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+
+/**
+ * :: Experimental ::
+ * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class Trigger {
+
+  /**
+   * :: Experimental ::
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+  return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * :: Experimental ::
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *import java.util.concurrent.TimeUnit
+   *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+  return ProcessingTime.create(interval, timeUnit);
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `duration` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *import scala.concurrent.duration._
+   *df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(Duration interval) {
+  return ProcessingTime.apply(interval);
+  }
+
+  /**
+   * :: Experimental ::
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is effectively 0, the query will run as fast as possible.
+   *
+   * {{{
+   *df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+   * }}}
+   * 

spark git commit: [SPARK-19876][BUILD] Move Trigger.java to java source hierarchy

2017-05-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master d099f414d -> 25ee816e0


[SPARK-19876][BUILD] Move Trigger.java to java source hierarchy

## What changes were proposed in this pull request?

Simply moves `Trigger.java` to `src/main/java` from `src/main/scala`
See https://github.com/apache/spark/pull/17219

## How was this patch tested?

Existing tests.

Author: Sean Owen 

Closes #17921 from srowen/SPARK-19876.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25ee816e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25ee816e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25ee816e

Branch: refs/heads/master
Commit: 25ee816e090c42f0e35be2d2cb0f8ec60726317c
Parents: d099f41
Author: Sean Owen 
Authored: Tue May 9 10:22:23 2017 -0700
Committer: Herman van Hovell 
Committed: Tue May 9 10:22:23 2017 -0700

--
 .../org/apache/spark/sql/streaming/Trigger.java | 105 +++
 .../org/apache/spark/sql/streaming/Trigger.java | 105 ---
 2 files changed, 105 insertions(+), 105 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/25ee816e/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
--
diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java 
b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 000..3e3997f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+
+/**
+ * :: Experimental ::
+ * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class Trigger {
+
+  /**
+   * :: Experimental ::
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+  return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * :: Experimental ::
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *import java.util.concurrent.TimeUnit
+   *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+  return ProcessingTime.create(interval, timeUnit);
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `duration` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *import scala.concurrent.duration._
+   *df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(Duration interval) {
+  return ProcessingTime.apply(interval);
+  }
+
+  /**
+   * :: Experimental ::
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is effectively 0, the query will run as fast as possible.
+   *
+   * {{{
+   *df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+   * }}}
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(String interval) {
+  return ProcessingTime.apply(interval);
+  }
+
+  /**
+   

spark git commit: [SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF

2017-05-09 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 08e1b78f0 -> 73aa23b8e


[SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF

## What changes were proposed in this pull request?
For some reason we don't have an API to register UserDefinedFunction as named 
UDF. It is a no brainer to add one, in addition to the existing register 
functions we have.

## How was this patch tested?
Added a test case in UDFSuite for the new API.

Author: Reynold Xin 

Closes #17915 from rxin/SPARK-20674.

(cherry picked from commit d099f414d2cb53f5a61f6e77317c736be6f953a0)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73aa23b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73aa23b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73aa23b8

Branch: refs/heads/branch-2.2
Commit: 73aa23b8ef64960e7f171aa07aec396667a2339d
Parents: 08e1b78
Author: Reynold Xin 
Authored: Tue May 9 09:24:28 2017 -0700
Committer: Xiao Li 
Committed: Tue May 9 09:24:36 2017 -0700

--
 .../org/apache/spark/sql/UDFRegistration.scala  | 22 +---
 .../scala/org/apache/spark/sql/UDFSuite.scala   |  7 +++
 2 files changed, 26 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73aa23b8/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index a576733..6accf1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -70,15 +70,31 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
* @param name the name of the UDAF.
* @param udaf the UDAF needs to be registered.
* @return the registered UDAF.
+   *
+   * @since 1.5.0
*/
-  def register(
-  name: String,
-  udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = {
+  def register(name: String, udaf: UserDefinedAggregateFunction): 
UserDefinedAggregateFunction = {
 def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf)
 functionRegistry.registerFunction(name, builder)
 udaf
   }
 
+  /**
+   * Register a user-defined function (UDF), for a UDF that's already defined 
using the DataFrame
+   * API (i.e. of type UserDefinedFunction).
+   *
+   * @param name the name of the UDF.
+   * @param udf the UDF needs to be registered.
+   * @return the registered UDF.
+   *
+   * @since 2.2.0
+   */
+  def register(name: String, udf: UserDefinedFunction): UserDefinedFunction = {
+def builder(children: Seq[Expression]) = 
udf.apply(children.map(Column.apply) : _*).expr
+functionRegistry.registerFunction(name, builder)
+udf
+  }
+
   // scalastyle:off line.size.limit
 
   /* register 0-22 were generated by this script

http://git-wip-us.apache.org/repos/asf/spark/blob/73aa23b8/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index ae6b2bc..6f8723a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -93,6 +93,13 @@ class UDFSuite extends QueryTest with SharedSQLContext {
 assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4)
   }
 
+  test("UDF defined using UserDefinedFunction") {
+import functions.udf
+val foo = udf((x: Int) => x + 1)
+spark.udf.register("foo", foo)
+assert(sql("select foo(5)").head().getInt(0) == 6)
+  }
+
   test("ZeroArgument UDF") {
 spark.udf.register("random0", () => { Math.random()})
 assert(sql("SELECT random0()").head().getDouble(0) >= 0.0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF

2017-05-09 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f561a76b2 -> d099f414d


[SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF

## What changes were proposed in this pull request?
For some reason we don't have an API to register UserDefinedFunction as named 
UDF. It is a no brainer to add one, in addition to the existing register 
functions we have.

## How was this patch tested?
Added a test case in UDFSuite for the new API.

Author: Reynold Xin 

Closes #17915 from rxin/SPARK-20674.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d099f414
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d099f414
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d099f414

Branch: refs/heads/master
Commit: d099f414d2cb53f5a61f6e77317c736be6f953a0
Parents: f561a76
Author: Reynold Xin 
Authored: Tue May 9 09:24:28 2017 -0700
Committer: Xiao Li 
Committed: Tue May 9 09:24:28 2017 -0700

--
 .../org/apache/spark/sql/UDFRegistration.scala  | 22 +---
 .../scala/org/apache/spark/sql/UDFSuite.scala   |  7 +++
 2 files changed, 26 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d099f414/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index a576733..6accf1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -70,15 +70,31 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
* @param name the name of the UDAF.
* @param udaf the UDAF needs to be registered.
* @return the registered UDAF.
+   *
+   * @since 1.5.0
*/
-  def register(
-  name: String,
-  udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = {
+  def register(name: String, udaf: UserDefinedAggregateFunction): 
UserDefinedAggregateFunction = {
 def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf)
 functionRegistry.registerFunction(name, builder)
 udaf
   }
 
+  /**
+   * Register a user-defined function (UDF), for a UDF that's already defined 
using the DataFrame
+   * API (i.e. of type UserDefinedFunction).
+   *
+   * @param name the name of the UDF.
+   * @param udf the UDF needs to be registered.
+   * @return the registered UDF.
+   *
+   * @since 2.2.0
+   */
+  def register(name: String, udf: UserDefinedFunction): UserDefinedFunction = {
+def builder(children: Seq[Expression]) = 
udf.apply(children.map(Column.apply) : _*).expr
+functionRegistry.registerFunction(name, builder)
+udf
+  }
+
   // scalastyle:off line.size.limit
 
   /* register 0-22 were generated by this script

http://git-wip-us.apache.org/repos/asf/spark/blob/d099f414/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index ae6b2bc..6f8723a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -93,6 +93,13 @@ class UDFSuite extends QueryTest with SharedSQLContext {
 assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4)
   }
 
+  test("UDF defined using UserDefinedFunction") {
+import functions.udf
+val foo = udf((x: Int) => x + 1)
+spark.udf.register("foo", foo)
+assert(sql("select foo(5)").head().getInt(0) == 6)
+  }
+
   test("ZeroArgument UDF") {
 spark.udf.register("random0", () => { Math.random()})
 assert(sql("SELECT random0()").head().getDouble(0) >= 0.0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 272d2a10d -> 08e1b78f0


[SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases

`ReplSuite.newProductSeqEncoder with REPL defined class` was flaky and throws 
OOM exception frequently. By analyzing the heap dump, we found the reason is 
that, in each test case of `ReplSuite`, we create a REPL instance, which 
creates a classloader and loads a lot of classes related to `SparkContext`. 
More details please see 
https://github.com/apache/spark/pull/17833#issuecomment-298711435.

In this PR, we create a new test suite, `SingletonReplSuite`, which shares one 
REPL instances among all the test cases. Then we move most of the tests from 
`ReplSuite` to `SingletonReplSuite`, to avoid creating a lot of REPL instances 
and reduce memory footprint.

test only change

Author: Wenchen Fan 

Closes #17844 from cloud-fan/flaky-test.

(cherry picked from commit f561a76b2f895dea52f228a9376948242c3331ad)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e1b78f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e1b78f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e1b78f

Branch: refs/heads/branch-2.2
Commit: 08e1b78f01955c7151d9e984d392d45deced6e34
Parents: 272d2a1
Author: Wenchen Fan 
Authored: Wed May 10 00:09:35 2017 +0800
Committer: Wenchen Fan 
Committed: Wed May 10 00:11:25 2017 +0800

--
 .../main/scala/org/apache/spark/repl/Main.scala |   2 +-
 .../org/apache/spark/repl/SparkILoop.scala  |   9 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala | 271 +---
 .../apache/spark/repl/SingletonReplSuite.scala  | 408 +++
 4 files changed, 412 insertions(+), 278 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/08e1b78f/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
--
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 39fc621..b8b38e8 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -68,7 +68,7 @@ object Main extends Logging {
 
 if (!hasErrors) {
   interp.process(settings) // Repl starts and goes in loop of R.E.P.L
-  Option(sparkContext).map(_.stop)
+  Option(sparkContext).foreach(_.stop)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/08e1b78f/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
--
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 76a66c1..d1d25b7 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -86,15 +86,8 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
 echo("Type :help for more information.")
   }
 
-  /** Add repl commands that needs to be blocked. e.g. reset */
-  private val blockedCommands = Set[String]()
-
-  /** Standard commands */
-  lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =
-standardCommands.filter(cmd => !blockedCommands(cmd.name))
-
   /** Available commands */
-  override def commands: List[LoopCommand] = sparkStandardCommands
+  override def commands: List[LoopCommand] = standardCommands
 
   /**
* We override `loadFiles` because we need to initialize Spark *before* the 
REPL

http://git-wip-us.apache.org/repos/asf/spark/blob/08e1b78f/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 121a02a..c7ae194 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -21,12 +21,12 @@ import java.io._
 import java.net.URLClassLoader
 
 import scala.collection.mutable.ArrayBuffer
-import org.apache.commons.lang3.StringEscapeUtils
+
 import org.apache.log4j.{Level, LogManager}
+
 import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.util.Utils
 
 class ReplSuite extends 

spark git commit: [SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 181261a81 -> f561a76b2


[SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases

## What changes were proposed in this pull request?

`ReplSuite.newProductSeqEncoder with REPL defined class` was flaky and throws 
OOM exception frequently. By analyzing the heap dump, we found the reason is 
that, in each test case of `ReplSuite`, we create a REPL instance, which 
creates a classloader and loads a lot of classes related to `SparkContext`. 
More details please see 
https://github.com/apache/spark/pull/17833#issuecomment-298711435.

In this PR, we create a new test suite, `SingletonReplSuite`, which shares one 
REPL instances among all the test cases. Then we move most of the tests from 
`ReplSuite` to `SingletonReplSuite`, to avoid creating a lot of REPL instances 
and reduce memory footprint.

## How was this patch tested?

test only change

Author: Wenchen Fan 

Closes #17844 from cloud-fan/flaky-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f561a76b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f561a76b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f561a76b

Branch: refs/heads/master
Commit: f561a76b2f895dea52f228a9376948242c3331ad
Parents: 181261a
Author: Wenchen Fan 
Authored: Wed May 10 00:09:35 2017 +0800
Committer: Wenchen Fan 
Committed: Wed May 10 00:09:35 2017 +0800

--
 .../main/scala/org/apache/spark/repl/Main.scala |   2 +-
 .../org/apache/spark/repl/SparkILoop.scala  |   9 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala | 272 +
 .../apache/spark/repl/SingletonReplSuite.scala  | 408 +++
 4 files changed, 412 insertions(+), 279 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f561a76b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
--
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 39fc621..b8b38e8 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -68,7 +68,7 @@ object Main extends Logging {
 
 if (!hasErrors) {
   interp.process(settings) // Repl starts and goes in loop of R.E.P.L
-  Option(sparkContext).map(_.stop)
+  Option(sparkContext).foreach(_.stop)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f561a76b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
--
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 76a66c1..d1d25b7 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -86,15 +86,8 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
 echo("Type :help for more information.")
   }
 
-  /** Add repl commands that needs to be blocked. e.g. reset */
-  private val blockedCommands = Set[String]()
-
-  /** Standard commands */
-  lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =
-standardCommands.filter(cmd => !blockedCommands(cmd.name))
-
   /** Available commands */
-  override def commands: List[LoopCommand] = sparkStandardCommands
+  override def commands: List[LoopCommand] = standardCommands
 
   /**
* We override `loadFiles` because we need to initialize Spark *before* the 
REPL

http://git-wip-us.apache.org/repos/asf/spark/blob/f561a76b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 8fe2708..c7ae194 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -21,12 +21,12 @@ import java.io._
 import java.net.URLClassLoader
 
 import scala.collection.mutable.ArrayBuffer
-import org.apache.commons.lang3.StringEscapeUtils
+
 import org.apache.log4j.{Level, LogManager}
+
 import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.util.Utils
 
 class ReplSuite extends SparkFunSuite {
 
@@ -148,71 +148,6 @@ class 

spark git commit: [SPARK-20355] Add per application spark version on the history server headerpage

2017-05-09 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 714811d0b -> 181261a81


[SPARK-20355] Add per application spark version on the history server headerpage

## What changes were proposed in this pull request?

Spark Version for a specific application is not displayed on the history page 
now. It should be nice to switch the spark version on the UI when we click on 
the specific application.
Currently there seems to be way as SparkListenerLogStart records the 
application version. So, it should be trivial to listen to this event and 
provision this change on the UI.
For Example
https://cloud.githubusercontent.com/assets/8295799/25092650/41f3970a-2354-11e7-9b0d-4646d0adeb61.png;>
https://cloud.githubusercontent.com/assets/8295799/25092743/9f9e2f28-2354-11e7-9605-f2f1c63f21fe.png;>

{"Event":"SparkListenerLogStart","Spark Version":"2.0.0"}
(Please fill in changes proposed in this fix)
Modified the SparkUI for History server to listen to SparkLogListenerStart 
event and extract the version and print it.

## How was this patch tested?
Manual testing of UI page. Attaching the UI screenshot changes here

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Sanket 

Closes #17658 from redsanket/SPARK-20355.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/181261a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/181261a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/181261a8

Branch: refs/heads/master
Commit: 181261a81d592b93181135a8267570e0c9ab2243
Parents: 714811d
Author: Sanket 
Authored: Tue May 9 09:30:09 2017 -0500
Committer: Tom Graves 
Committed: Tue May 9 09:30:09 2017 -0500

--
 .../history/ApplicationHistoryProvider.scala   |  3 ++-
 .../spark/deploy/history/FsHistoryProvider.scala   | 17 -
 .../spark/scheduler/ApplicationEventListener.scala |  7 +++
 .../spark/scheduler/EventLoggingListener.scala | 13 ++---
 .../org/apache/spark/scheduler/SparkListener.scala |  4 ++--
 .../apache/spark/scheduler/SparkListenerBus.scala  |  1 -
 .../status/api/v1/ApplicationListResource.scala|  3 ++-
 .../scala/org/apache/spark/status/api/v1/api.scala |  3 ++-
 .../main/scala/org/apache/spark/ui/SparkUI.scala   |  6 +-
 .../main/scala/org/apache/spark/ui/UIUtils.scala   |  2 +-
 .../application_list_json_expectation.json | 10 ++
 .../completed_app_list_json_expectation.json   | 11 +++
 .../limit_app_list_json_expectation.json   |  3 +++
 .../maxDate2_app_list_json_expectation.json|  1 +
 .../maxDate_app_list_json_expectation.json |  2 ++
 .../maxEndDate_app_list_json_expectation.json  |  7 +++
 ...e_and_maxEndDate_app_list_json_expectation.json |  4 
 .../minDate_app_list_json_expectation.json |  8 
 ...e_and_maxEndDate_app_list_json_expectation.json |  4 
 .../minEndDate_app_list_json_expectation.json  |  6 +-
 .../one_app_json_expectation.json  |  1 +
 .../one_app_multi_attempt_json_expectation.json|  2 ++
 .../deploy/history/ApplicationCacheSuite.scala |  2 +-
 .../deploy/history/FsHistoryProviderSuite.scala|  4 ++--
 project/MimaExcludes.scala |  3 +++
 25 files changed, 107 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/181261a8/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 6d8758a..5cb48ca 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -30,7 +30,8 @@ private[spark] case class ApplicationAttemptInfo(
 endTime: Long,
 lastUpdated: Long,
 sparkUser: String,
-completed: Boolean = false)
+completed: Boolean = false,
+appSparkVersion: String)
 
 private[spark] case class ApplicationHistoryInfo(
 id: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/181261a8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 

spark git commit: [SPARK-20311][SQL] Support aliases for table value functions

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b3309676b -> 272d2a10d


[SPARK-20311][SQL] Support aliases for table value functions

## What changes were proposed in this pull request?
This pr added parsing rules to support aliases in table value functions.

## How was this patch tested?
Added tests in `PlanParserSuite`.

Author: Takeshi Yamamuro 

Closes #17666 from maropu/SPARK-20311.

(cherry picked from commit 714811d0b5bcb5d47c39782ff74f898d276ecc59)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/272d2a10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/272d2a10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/272d2a10

Branch: refs/heads/branch-2.2
Commit: 272d2a10d70588e1f80cc6579d4ec3c44b5bbfc2
Parents: b330967
Author: Takeshi Yamamuro 
Authored: Tue May 9 20:22:51 2017 +0800
Committer: Wenchen Fan 
Committed: Tue May 9 20:23:53 2017 +0800

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 --
 .../analysis/ResolveTableValuedFunctions.scala  | 22 +---
 .../sql/catalyst/analysis/unresolved.scala  | 10 +++--
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 17 +++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 -
 .../sql/catalyst/parser/PlanParserSuite.scala   | 13 +++-
 6 files changed, 79 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/272d2a10/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 1ecb3d1..15e4dd4 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -472,15 +472,23 @@ identifierComment
 ;
 
 relationPrimary
-: tableIdentifier sample? (AS? strictIdentifier)?   #tableName
-| '(' queryNoWith ')' sample? (AS? strictIdentifier)?   
#aliasedQuery
-| '(' relation ')' sample? (AS? strictIdentifier)?  
#aliasedRelation
-| inlineTable   
#inlineTableDefault2
-| identifier '(' (expression (',' expression)*)? ')'
#tableValuedFunction
+: tableIdentifier sample? (AS? strictIdentifier)?  #tableName
+| '(' queryNoWith ')' sample? (AS? strictIdentifier)?  #aliasedQuery
+| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation
+| inlineTable  #inlineTableDefault2
+| functionTable#tableValuedFunction
 ;
 
 inlineTable
-: VALUES expression (',' expression)*  (AS? identifier identifierList?)?
+: VALUES expression (',' expression)*  tableAlias
+;
+
+functionTable
+: identifier '(' (expression (',' expression)*)? ')' tableAlias
+;
+
+tableAlias
+: (AS? identifier identifierList?)?
 ;
 
 rowFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/272d2a10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index de6de24..dad1340 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Range}
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.{DataType, IntegerType, LongType}
 
@@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
 case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
-  builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) 

spark git commit: [SPARK-20311][SQL] Support aliases for table value functions

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 0d00c768a -> 714811d0b


[SPARK-20311][SQL] Support aliases for table value functions

## What changes were proposed in this pull request?
This pr added parsing rules to support aliases in table value functions.

## How was this patch tested?
Added tests in `PlanParserSuite`.

Author: Takeshi Yamamuro 

Closes #17666 from maropu/SPARK-20311.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/714811d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/714811d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/714811d0

Branch: refs/heads/master
Commit: 714811d0b5bcb5d47c39782ff74f898d276ecc59
Parents: 0d00c76
Author: Takeshi Yamamuro 
Authored: Tue May 9 20:22:51 2017 +0800
Committer: Wenchen Fan 
Committed: Tue May 9 20:22:51 2017 +0800

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 --
 .../analysis/ResolveTableValuedFunctions.scala  | 22 +---
 .../sql/catalyst/analysis/unresolved.scala  | 10 +++--
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 17 +++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 -
 .../sql/catalyst/parser/PlanParserSuite.scala   | 13 +++-
 6 files changed, 79 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/714811d0/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 14c511f..41daf58 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -472,15 +472,23 @@ identifierComment
 ;
 
 relationPrimary
-: tableIdentifier sample? (AS? strictIdentifier)?   #tableName
-| '(' queryNoWith ')' sample? (AS? strictIdentifier)?   
#aliasedQuery
-| '(' relation ')' sample? (AS? strictIdentifier)?  
#aliasedRelation
-| inlineTable   
#inlineTableDefault2
-| identifier '(' (expression (',' expression)*)? ')'
#tableValuedFunction
+: tableIdentifier sample? (AS? strictIdentifier)?  #tableName
+| '(' queryNoWith ')' sample? (AS? strictIdentifier)?  #aliasedQuery
+| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation
+| inlineTable  #inlineTableDefault2
+| functionTable#tableValuedFunction
 ;
 
 inlineTable
-: VALUES expression (',' expression)*  (AS? identifier identifierList?)?
+: VALUES expression (',' expression)*  tableAlias
+;
+
+functionTable
+: identifier '(' (expression (',' expression)*)? ')' tableAlias
+;
+
+tableAlias
+: (AS? identifier identifierList?)?
 ;
 
 rowFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/714811d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index de6de24..dad1340 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Range}
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.{DataType, IntegerType, LongType}
 
@@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
 case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
-  builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
+  val resolvedFunc = 
builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
 case Some(tvf) 

spark git commit: [SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the package of sql/core and sql/hive

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4b7aa0b1d -> b3309676b


[SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the 
package of sql/core and sql/hive

## What changes were proposed in this pull request?

So far, we do not drop all the cataloged objects after each package. Sometimes, 
we might hit strange test case errors because the previous test suite did not 
drop the cataloged/temporary objects (tables/functions/database). At least, we 
can first clean up the environment when completing the package of `sql/core` 
and `sql/hive`.

## How was this patch tested?
N/A

Author: Xiao Li 

Closes #17908 from gatorsmile/reset.

(cherry picked from commit 0d00c768a860fc03402c8f0c9081b8147c29133e)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3309676
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3309676
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3309676

Branch: refs/heads/branch-2.2
Commit: b3309676bb83a80d38b916066d046866a6f42ef0
Parents: 4b7aa0b
Author: Xiao Li 
Authored: Tue May 9 20:10:50 2017 +0800
Committer: Wenchen Fan 
Committed: Tue May 9 20:11:08 2017 +0800

--
 .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala| 3 ++-
 .../scala/org/apache/spark/sql/test/SharedSQLContext.scala| 1 +
 .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala  | 7 +--
 3 files changed, 4 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b3309676/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6c6d600..18e5146 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1251,9 +1251,10 @@ class SessionCatalog(
 dropTempFunction(func.funcName, ignoreIfNotExists = false)
   }
 }
-tempTables.clear()
+clearTempTables()
 globalTempViewManager.clear()
 functionRegistry.clear()
+tableRelationCache.invalidateAll()
 // restore built-in functions
 FunctionRegistry.builtin.listFunction().foreach { f =>
   val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)

http://git-wip-us.apache.org/repos/asf/spark/blob/b3309676/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 81c69a3..7cea4c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -74,6 +74,7 @@ trait SharedSQLContext extends SQLTestUtils with 
BeforeAndAfterEach with Eventua
   protected override def afterAll(): Unit = {
 super.afterAll()
 if (_spark != null) {
+  _spark.sessionState.catalog.reset()
   _spark.stop()
   _spark = null
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3309676/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index d9bb1f8..ee9ac21 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -488,14 +488,9 @@ private[hive] class TestHiveSparkSession(
 
   sharedState.cacheManager.clearCache()
   loadedTables.clear()
-  sessionState.catalog.clearTempTables()
-  sessionState.catalog.tableRelationCache.invalidateAll()
-
+  sessionState.catalog.reset()
   metadataHive.reset()
 
-  
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
-foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
-
   // HDFS root scratch dir requires the write all (733) permission. For 
each connecting user,
   // an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, 
with
   // ${hive.scratch.dir.permission}. To resolve the permission issue, the 
simplest way is to



spark git commit: [SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the package of sql/core and sql/hive

2017-05-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master b8733e0ad -> 0d00c768a


[SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the 
package of sql/core and sql/hive

## What changes were proposed in this pull request?

So far, we do not drop all the cataloged objects after each package. Sometimes, 
we might hit strange test case errors because the previous test suite did not 
drop the cataloged/temporary objects (tables/functions/database). At least, we 
can first clean up the environment when completing the package of `sql/core` 
and `sql/hive`.

## How was this patch tested?
N/A

Author: Xiao Li 

Closes #17908 from gatorsmile/reset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d00c768
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d00c768
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d00c768

Branch: refs/heads/master
Commit: 0d00c768a860fc03402c8f0c9081b8147c29133e
Parents: b8733e0
Author: Xiao Li 
Authored: Tue May 9 20:10:50 2017 +0800
Committer: Wenchen Fan 
Committed: Tue May 9 20:10:50 2017 +0800

--
 .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala| 3 ++-
 .../scala/org/apache/spark/sql/test/SharedSQLContext.scala| 1 +
 .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala  | 7 +--
 3 files changed, 4 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0d00c768/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6c6d600..18e5146 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1251,9 +1251,10 @@ class SessionCatalog(
 dropTempFunction(func.funcName, ignoreIfNotExists = false)
   }
 }
-tempTables.clear()
+clearTempTables()
 globalTempViewManager.clear()
 functionRegistry.clear()
+tableRelationCache.invalidateAll()
 // restore built-in functions
 FunctionRegistry.builtin.listFunction().foreach { f =>
   val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)

http://git-wip-us.apache.org/repos/asf/spark/blob/0d00c768/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 81c69a3..7cea4c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -74,6 +74,7 @@ trait SharedSQLContext extends SQLTestUtils with 
BeforeAndAfterEach with Eventua
   protected override def afterAll(): Unit = {
 super.afterAll()
 if (_spark != null) {
+  _spark.sessionState.catalog.reset()
   _spark.stop()
   _spark = null
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d00c768/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index d9bb1f8..ee9ac21 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -488,14 +488,9 @@ private[hive] class TestHiveSparkSession(
 
   sharedState.cacheManager.clearCache()
   loadedTables.clear()
-  sessionState.catalog.clearTempTables()
-  sessionState.catalog.tableRelationCache.invalidateAll()
-
+  sessionState.catalog.reset()
   metadataHive.reset()
 
-  
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
-foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
-
   // HDFS root scratch dir requires the write all (733) permission. For 
each connecting user,
   // an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, 
with
   // ${hive.scratch.dir.permission}. To resolve the permission issue, the 
simplest way is to


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, 

spark git commit: [SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML

2017-05-09 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4bbfad44e -> 4b7aa0b1d


[SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML

## What changes were proposed in this pull request?
Remove ML methods we deprecated in 2.1.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang 

Closes #17867 from yanboliang/spark-20606.

(cherry picked from commit b8733e0ad9f5a700f385e210450fd2c10137293e)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b7aa0b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b7aa0b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b7aa0b1

Branch: refs/heads/branch-2.2
Commit: 4b7aa0b1dbd85e2238acba45e8f94c097358fb72
Parents: 4bbfad4
Author: Yanbo Liang 
Authored: Tue May 9 17:30:37 2017 +0800
Committer: Yanbo Liang 
Committed: Tue May 9 17:30:50 2017 +0800

--
 .../classification/DecisionTreeClassifier.scala |  18 ++--
 .../spark/ml/classification/GBTClassifier.scala |  24 ++---
 .../classification/RandomForestClassifier.scala |  24 ++---
 .../ml/regression/DecisionTreeRegressor.scala   |  18 ++--
 .../spark/ml/regression/GBTRegressor.scala  |  24 ++---
 .../ml/regression/RandomForestRegressor.scala   |  24 ++---
 .../org/apache/spark/ml/tree/treeParams.scala   | 105 ---
 .../org/apache/spark/ml/util/ReadWrite.scala|  16 ---
 project/MimaExcludes.scala  |  68 
 python/pyspark/ml/util.py   |  32 --
 10 files changed, 134 insertions(+), 219 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4b7aa0b1/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index 9f60f08..5fb105c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -54,27 +54,27 @@ class DecisionTreeClassifier @Since("1.4.0") (
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMaxDepth(value: Int): this.type = set(maxDepth, value)
+  def setMaxDepth(value: Int): this.type = set(maxDepth, value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMaxBins(value: Int): this.type = set(maxBins, value)
+  def setMaxBins(value: Int): this.type = set(maxBins, value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMinInstancesPerNode(value: Int): this.type = 
set(minInstancesPerNode, value)
+  def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, 
value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMinInfoGain(value: Double): this.type = set(minInfoGain, 
value)
+  def setMinInfoGain(value: Double): this.type = set(minInfoGain, value)
 
   /** @group expertSetParam */
   @Since("1.4.0")
-  override def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, 
value)
+  def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value)
 
   /** @group expertSetParam */
   @Since("1.4.0")
-  override def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, 
value)
+  def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value)
 
   /**
* Specifies how often to checkpoint the cached node IDs.
@@ -86,15 +86,15 @@ class DecisionTreeClassifier @Since("1.4.0") (
* @group setParam
*/
   @Since("1.4.0")
-  override def setCheckpointInterval(value: Int): this.type = 
set(checkpointInterval, value)
+  def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, 
value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setImpurity(value: String): this.type = set(impurity, value)
+  def setImpurity(value: String): this.type = set(impurity, value)
 
   /** @group setParam */
   @Since("1.6.0")
-  override def setSeed(value: Long): this.type = set(seed, value)
+  def setSeed(value: Long): this.type = set(seed, value)
 
   override protected def train(dataset: Dataset[_]): 
DecisionTreeClassificationModel = {
 val categoricalFeatures: Map[Int, Int] =

http://git-wip-us.apache.org/repos/asf/spark/blob/4b7aa0b1/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala 

spark git commit: [SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML

2017-05-09 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master be53a7835 -> b8733e0ad


[SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML

## What changes were proposed in this pull request?
Remove ML methods we deprecated in 2.1.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang 

Closes #17867 from yanboliang/spark-20606.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8733e0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8733e0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8733e0a

Branch: refs/heads/master
Commit: b8733e0ad9f5a700f385e210450fd2c10137293e
Parents: be53a78
Author: Yanbo Liang 
Authored: Tue May 9 17:30:37 2017 +0800
Committer: Yanbo Liang 
Committed: Tue May 9 17:30:37 2017 +0800

--
 .../classification/DecisionTreeClassifier.scala |  18 ++--
 .../spark/ml/classification/GBTClassifier.scala |  24 ++---
 .../classification/RandomForestClassifier.scala |  24 ++---
 .../ml/regression/DecisionTreeRegressor.scala   |  18 ++--
 .../spark/ml/regression/GBTRegressor.scala  |  24 ++---
 .../ml/regression/RandomForestRegressor.scala   |  24 ++---
 .../org/apache/spark/ml/tree/treeParams.scala   | 105 ---
 .../org/apache/spark/ml/util/ReadWrite.scala|  16 ---
 project/MimaExcludes.scala  |  68 
 python/pyspark/ml/util.py   |  32 --
 10 files changed, 134 insertions(+), 219 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b8733e0a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index 9f60f08..5fb105c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -54,27 +54,27 @@ class DecisionTreeClassifier @Since("1.4.0") (
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMaxDepth(value: Int): this.type = set(maxDepth, value)
+  def setMaxDepth(value: Int): this.type = set(maxDepth, value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMaxBins(value: Int): this.type = set(maxBins, value)
+  def setMaxBins(value: Int): this.type = set(maxBins, value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMinInstancesPerNode(value: Int): this.type = 
set(minInstancesPerNode, value)
+  def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, 
value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setMinInfoGain(value: Double): this.type = set(minInfoGain, 
value)
+  def setMinInfoGain(value: Double): this.type = set(minInfoGain, value)
 
   /** @group expertSetParam */
   @Since("1.4.0")
-  override def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, 
value)
+  def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value)
 
   /** @group expertSetParam */
   @Since("1.4.0")
-  override def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, 
value)
+  def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value)
 
   /**
* Specifies how often to checkpoint the cached node IDs.
@@ -86,15 +86,15 @@ class DecisionTreeClassifier @Since("1.4.0") (
* @group setParam
*/
   @Since("1.4.0")
-  override def setCheckpointInterval(value: Int): this.type = 
set(checkpointInterval, value)
+  def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, 
value)
 
   /** @group setParam */
   @Since("1.4.0")
-  override def setImpurity(value: String): this.type = set(impurity, value)
+  def setImpurity(value: String): this.type = set(impurity, value)
 
   /** @group setParam */
   @Since("1.6.0")
-  override def setSeed(value: Long): this.type = set(seed, value)
+  def setSeed(value: Long): this.type = set(seed, value)
 
   override protected def train(dataset: Dataset[_]): 
DecisionTreeClassificationModel = {
 val categoricalFeatures: Map[Int, Int] =

http://git-wip-us.apache.org/repos/asf/spark/blob/b8733e0a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
index ade0960..263ed10 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
+++ 

spark-website git commit: Direct 2.1.0, 2.0.1 downloads to archive; use https links for download; Apache Hadoop; remove stale download logic

2017-05-09 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 7b32b181f -> b54c4f3fa


Direct 2.1.0, 2.0.1 downloads to archive; use https links for download; Apache 
Hadoop; remove stale download logic


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/b54c4f3f
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/b54c4f3f
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/b54c4f3f

Branch: refs/heads/asf-site
Commit: b54c4f3faf837a3e772af989eb7df0b64698e557
Parents: 7b32b18
Author: Sean Owen 
Authored: Tue May 9 10:09:21 2017 +0100
Committer: Sean Owen 
Committed: Tue May 9 10:09:21 2017 +0100

--
 js/downloads.js | 41 -
 1 file changed, 16 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/b54c4f3f/js/downloads.js
--
diff --git a/js/downloads.js b/js/downloads.js
index 81dcbfc..d308389 100644
--- a/js/downloads.js
+++ b/js/downloads.js
@@ -8,14 +8,14 @@ function addRelease(version, releaseDate, packages, stable) {
 }
 
 var sources = {pretty: "Source Code", tag: "sources"};
-var hadoopFree = {pretty: "Pre-build with user-provided Hadoop [can use with 
most Hadoop distributions]", tag: "without-hadoop"};
-var hadoop1 = {pretty: "Pre-built for Hadoop 1.X", tag: "hadoop1"};
+var hadoopFree = {pretty: "Pre-build with user-provided Apache Hadoop", tag: 
"without-hadoop"};
+var hadoop1 = {pretty: "Pre-built for Apache Hadoop 1.X", tag: "hadoop1"};
 var cdh4 = {pretty: "Pre-built for CDH 4", tag: "cdh4"};
-var hadoop2 = {pretty: "Pre-built for Hadoop 2.2", tag: "hadoop2"};
-var hadoop2p3 = {pretty: "Pre-built for Hadoop 2.3", tag: "hadoop2.3"};
-var hadoop2p4 = {pretty: "Pre-built for Hadoop 2.4", tag: "hadoop2.4"};
-var hadoop2p6 = {pretty: "Pre-built for Hadoop 2.6", tag: "hadoop2.6"};
-var hadoop2p7 = {pretty: "Pre-built for Hadoop 2.7 and later", tag: 
"hadoop2.7"};
+var hadoop2 = {pretty: "Pre-built for Apache Hadoop 2.2", tag: "hadoop2"};
+var hadoop2p3 = {pretty: "Pre-built for Apache Hadoop 2.3", tag: "hadoop2.3"};
+var hadoop2p4 = {pretty: "Pre-built for Apache Hadoop 2.4", tag: "hadoop2.4"};
+var hadoop2p6 = {pretty: "Pre-built for Apache Hadoop 2.6", tag: "hadoop2.6"};
+var hadoop2p7 = {pretty: "Pre-built for Apache Hadoop 2.7 and later", tag: 
"hadoop2.7"};
 
 // 1.4.0+
 var packagesV6 = [hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, hadoop1, cdh4, 
sources];
@@ -135,7 +135,7 @@ function onVersionSelect() {
 append(packageSelect, option);
   }
 
-  var href = "http://www.apache.org/dist/spark/spark-; + version + "/";
+  var href = "https://www.apache.org/dist/spark/spark-; + version + "/";
   var link = "" + versionShort(version) + " 
signatures and checksums";
   append(verifyLink, link);
 
@@ -152,13 +152,8 @@ function onPackageSelect() {
 
   var pkg = getSelectedValue(packageSelect);
 
-  //if (pkg.toLowerCase().indexOf("mapr") > -1) {
-  //  var external = "External Download (MAY CONTAIN INCOMPATIBLE LICENSES)";
-  //  append(downloadSelect, "" + external + 
"");
-  //} else {
-append(downloadSelect, "Direct Download");
-append(downloadSelect, "Select Apache 
Mirror");
-  //}
+  append(downloadSelect, "Direct Download");
+  append(downloadSelect, "Select Apache 
Mirror");
   updateDownloadLink();
 }
 
@@ -184,18 +179,14 @@ function updateDownloadLink() {
 .replace(/\$pkg/g, pkg)
 .replace(/-bin-sources/, ""); // special case for source packages
 
-  var link = "http://d3kbcqa49mib13.cloudfront.net/$artifact;;
-  if (version < "0.8.0") {
-link = "http://spark-project.org/download/$artifact;;
-  }
-  if (pkg.toLowerCase().indexOf("mapr") > -1) {
-link = "http://package.mapr.com/tools/apache-spark/$ver/$artifact;
-  } else if (download == "apache") {
+  var link = "https://d3kbcqa49mib13.cloudfront.net/$artifact;;
+  if (download == "apache") {
 if (version < "1.6.3" ||
-(version >= "2.0.0" && version < "2.0.1")) {
-  link = "http://archive.apache.org/dist/spark/spark-$ver/$artifact;;
+(version >= "2.0.0" && version <= "2.0.1") ||
+(version >= "2.1.0" && version <= "2.1.0")) {
+  link = "https://archive.apache.org/dist/spark/spark-$ver/$artifact;;
 } else {
-  link = "http://www.apache.org/dyn/closer.lua/spark/spark-$ver/$artifact;;
+  link = 
"https://www.apache.org/dyn/closer.lua/spark/spark-$ver/$artifact;;
 }
   }
   link = link


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException

2017-05-09 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 ca3f7edba -> 4bbfad44e


[SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException

## What changes were proposed in this pull request?

Added a check for for the number of defined values.  Previously the argmax 
function assumed that at least one value was defined if the vector size was 
greater than zero.

## How was this patch tested?

Tests were added to the existing VectorsSuite to cover this case.

Author: Jon McLean 

Closes #17877 from jonmclean/vectorArgmaxIndexBug.

(cherry picked from commit be53a78352ae7c70d8a07d0df24574b3e3129b4a)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bbfad44
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bbfad44
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bbfad44

Branch: refs/heads/branch-2.2
Commit: 4bbfad44e426365ad9f4941d68c110523b17ea6d
Parents: ca3f7ed
Author: Jon McLean 
Authored: Tue May 9 09:47:50 2017 +0100
Committer: Sean Owen 
Committed: Tue May 9 09:47:58 2017 +0100

--
 .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala   | 2 ++
 .../test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala  | 7 +++
 .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 ++
 .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++
 4 files changed, 18 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
--
diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 8e166ba..3fbc095 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -657,6 +657,8 @@ class SparseVector @Since("2.0.0") (
   override def argmax: Int = {
 if (size == 0) {
   -1
+} else if (numActives == 0) {
+  0
 } else {
   // Find the max active entry.
   var maxIdx = indices(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
--
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala 
b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
index dfbdaf1..4cd91af 100644
--- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
+++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
@@ -125,6 +125,13 @@ class VectorsSuite extends SparkMLFunSuite {
 
 val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0))
 assert(vec8.argmax === 0)
+
+// Check for case when sparse vector is non-empty but the values are empty
+val vec9 = Vectors.sparse(100, Array.empty[Int], 
Array.empty[Double]).asInstanceOf[SparseVector]
+assert(vec9.argmax === 0)
+
+val vec10 = Vectors.sparse(1, Array.empty[Int], 
Array.empty[Double]).asInstanceOf[SparseVector]
+assert(vec10.argmax === 0)
   }
 
   test("vector equals") {

http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 723addc..f063420 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -846,6 +846,8 @@ class SparseVector @Since("1.0.0") (
   override def argmax: Int = {
 if (size == 0) {
   -1
+} else if (numActives == 0) {
+  0
 } else {
   // Find the max active entry.
   var maxIdx = indices(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 71a3cea..6172cff 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -122,6 +122,13 @@ class VectorsSuite extends SparkFunSuite with Logging {
 
 val vec8 = Vectors.sparse(5, Array(1, 2), 

spark git commit: [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException

2017-05-09 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a1112c615 -> f7a91a17e


[SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException

## What changes were proposed in this pull request?

Added a check for for the number of defined values.  Previously the argmax 
function assumed that at least one value was defined if the vector size was 
greater than zero.

## How was this patch tested?

Tests were added to the existing VectorsSuite to cover this case.

Author: Jon McLean 

Closes #17877 from jonmclean/vectorArgmaxIndexBug.

(cherry picked from commit be53a78352ae7c70d8a07d0df24574b3e3129b4a)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7a91a17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7a91a17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7a91a17

Branch: refs/heads/branch-2.1
Commit: f7a91a17e8e20965b3e634e611690a96f72cec6b
Parents: a1112c6
Author: Jon McLean 
Authored: Tue May 9 09:47:50 2017 +0100
Committer: Sean Owen 
Committed: Tue May 9 09:48:09 2017 +0100

--
 .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala   | 2 ++
 .../test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala  | 7 +++
 .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 ++
 .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++
 4 files changed, 18 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
--
diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 22e4ec6..7bc2cb1 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -657,6 +657,8 @@ class SparseVector @Since("2.0.0") (
   override def argmax: Int = {
 if (size == 0) {
   -1
+} else if (numActives == 0) {
+  0
 } else {
   // Find the max active entry.
   var maxIdx = indices(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
--
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala 
b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
index ea22c27..bd71656 100644
--- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
+++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
@@ -125,6 +125,13 @@ class VectorsSuite extends SparkMLFunSuite {
 
 val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0))
 assert(vec8.argmax === 0)
+
+// Check for case when sparse vector is non-empty but the values are empty
+val vec9 = Vectors.sparse(100, Array.empty[Int], 
Array.empty[Double]).asInstanceOf[SparseVector]
+assert(vec9.argmax === 0)
+
+val vec10 = Vectors.sparse(1, Array.empty[Int], 
Array.empty[Double]).asInstanceOf[SparseVector]
+assert(vec10.argmax === 0)
   }
 
   test("vector equals") {

http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 63ea9d3..5282849 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -846,6 +846,8 @@ class SparseVector @Since("1.0.0") (
   override def argmax: Int = {
 if (size == 0) {
   -1
+} else if (numActives == 0) {
+  0
 } else {
   // Find the max active entry.
   var maxIdx = indices(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 71a3cea..6172cff 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -122,6 +122,13 @@ class VectorsSuite extends SparkFunSuite with Logging {
 
 val vec8 = Vectors.sparse(5, Array(1, 2), 

spark git commit: [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException

2017-05-09 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 10b00abad -> be53a7835


[SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException

## What changes were proposed in this pull request?

Added a check for for the number of defined values.  Previously the argmax 
function assumed that at least one value was defined if the vector size was 
greater than zero.

## How was this patch tested?

Tests were added to the existing VectorsSuite to cover this case.

Author: Jon McLean 

Closes #17877 from jonmclean/vectorArgmaxIndexBug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be53a783
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be53a783
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be53a783

Branch: refs/heads/master
Commit: be53a78352ae7c70d8a07d0df24574b3e3129b4a
Parents: 10b00ab
Author: Jon McLean 
Authored: Tue May 9 09:47:50 2017 +0100
Committer: Sean Owen 
Committed: Tue May 9 09:47:50 2017 +0100

--
 .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala   | 2 ++
 .../test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala  | 7 +++
 .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 ++
 .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++
 4 files changed, 18 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
--
diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 8e166ba..3fbc095 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -657,6 +657,8 @@ class SparseVector @Since("2.0.0") (
   override def argmax: Int = {
 if (size == 0) {
   -1
+} else if (numActives == 0) {
+  0
 } else {
   // Find the max active entry.
   var maxIdx = indices(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
--
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala 
b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
index dfbdaf1..4cd91af 100644
--- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
+++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala
@@ -125,6 +125,13 @@ class VectorsSuite extends SparkMLFunSuite {
 
 val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0))
 assert(vec8.argmax === 0)
+
+// Check for case when sparse vector is non-empty but the values are empty
+val vec9 = Vectors.sparse(100, Array.empty[Int], 
Array.empty[Double]).asInstanceOf[SparseVector]
+assert(vec9.argmax === 0)
+
+val vec10 = Vectors.sparse(1, Array.empty[Int], 
Array.empty[Double]).asInstanceOf[SparseVector]
+assert(vec10.argmax === 0)
   }
 
   test("vector equals") {

http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 723addc..f063420 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -846,6 +846,8 @@ class SparseVector @Since("1.0.0") (
   override def argmax: Int = {
 if (size == 0) {
   -1
+} else if (numActives == 0) {
+  0
 } else {
   // Find the max active entry.
   var maxIdx = indices(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 71a3cea..6172cff 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -122,6 +122,13 @@ class VectorsSuite extends SparkFunSuite with Logging {
 
 val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0))
 assert(vec8.argmax === 0)
+
+// Check for case when sparse vector is non-empty but the values are 

spark git commit: [SPARK-20587][ML] Improve performance of ML ALS recommendForAll

2017-05-09 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 72fca9a0a -> ca3f7edba


[SPARK-20587][ML] Improve performance of ML ALS recommendForAll

This PR is a `DataFrame` version of #17742 for 
[SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving 
the performance of `recommendAll` methods.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath 

Closes #17845 from MLnick/ml-als-perf.

(cherry picked from commit 10b00abadf4a3473332eef996db7b66f491316f2)
Signed-off-by: Nick Pentreath 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca3f7edb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca3f7edb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca3f7edb

Branch: refs/heads/branch-2.2
Commit: ca3f7edbad6a2e7fcd1c1d3dbd1a522cd0d7c476
Parents: 72fca9a
Author: Nick Pentreath 
Authored: Tue May 9 10:13:15 2017 +0200
Committer: Nick Pentreath 
Committed: Tue May 9 10:13:36 2017 +0200

--
 .../apache/spark/ml/recommendation/ALS.scala| 71 ++--
 1 file changed, 64 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ca3f7edb/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index a20ef72..4a130e1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, 
SortDataFormat, Sorter}
 import org.apache.spark.util.random.XORShiftRandom
 
@@ -356,6 +356,19 @@ class ALSModel private[ml] (
 
   /**
* Makes recommendations for all users (or items).
+   *
+   * Note: the previous approach used for computing top-k recommendations
+   * used a cross-join followed by predicting a score for each row of the 
joined dataset.
+   * However, this results in exploding the size of intermediate data. While 
Spark SQL makes it
+   * relatively efficient, the approach implemented here is significantly more 
efficient.
+   *
+   * This approach groups factors into blocks and computes the top-k elements 
per block,
+   * using a simple dot product (instead of gemm) and an efficient 
[[BoundedPriorityQueue]].
+   * It then computes the global top-k by aggregating the per block top-k 
elements with
+   * a [[TopByKeyAggregator]]. This significantly reduces the size of 
intermediate and shuffle data.
+   * This is the DataFrame equivalent to the approach used in
+   * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]].
+   *
* @param srcFactors src factors for which to generate recommendations
* @param dstFactors dst factors used to make recommendations
* @param srcOutputColumn name of the column for the source ID in the output 
DataFrame
@@ -372,11 +385,43 @@ class ALSModel private[ml] (
   num: Int): DataFrame = {
 import srcFactors.sparkSession.implicits._
 
-val ratings = srcFactors.crossJoin(dstFactors)
-  .select(
-srcFactors("id"),
-dstFactors("id"),
-predict(srcFactors("features"), dstFactors("features")))
+val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
+val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
+val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
+  .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
+  .flatMap { case (srcIter, dstIter) =>
+val m = srcIter.size
+val n = math.min(dstIter.size, num)
+val output = new Array[(Int, Int, Float)](m * n)
+var j = 0
+val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2))
+srcIter.foreach { case (srcId, srcFactor) =>
+  dstIter.foreach { case (dstId, dstFactor) =>
+/*
+ * The below code is equivalent to
+ *`val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)`
+ * This handwritten version is as or more efficient as BLAS calls 
in this case.
+ */
+var score = 0.0f
+var k = 0
+while (k < rank) {
+  score += srcFactor(k) * dstFactor(k)
+  k += 1
+}
+pq += dstId 

spark git commit: [SPARK-20587][ML] Improve performance of ML ALS recommendForAll

2017-05-09 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master 807942476 -> 10b00abad


[SPARK-20587][ML] Improve performance of ML ALS recommendForAll

This PR is a `DataFrame` version of #17742 for 
[SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving 
the performance of `recommendAll` methods.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath 

Closes #17845 from MLnick/ml-als-perf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10b00aba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10b00aba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10b00aba

Branch: refs/heads/master
Commit: 10b00abadf4a3473332eef996db7b66f491316f2
Parents: 8079424
Author: Nick Pentreath 
Authored: Tue May 9 10:13:15 2017 +0200
Committer: Nick Pentreath 
Committed: Tue May 9 10:13:15 2017 +0200

--
 .../apache/spark/ml/recommendation/ALS.scala| 71 ++--
 1 file changed, 64 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/10b00aba/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 1562bf1..d626f04 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, 
SortDataFormat, Sorter}
 import org.apache.spark.util.random.XORShiftRandom
 
@@ -356,6 +356,19 @@ class ALSModel private[ml] (
 
   /**
* Makes recommendations for all users (or items).
+   *
+   * Note: the previous approach used for computing top-k recommendations
+   * used a cross-join followed by predicting a score for each row of the 
joined dataset.
+   * However, this results in exploding the size of intermediate data. While 
Spark SQL makes it
+   * relatively efficient, the approach implemented here is significantly more 
efficient.
+   *
+   * This approach groups factors into blocks and computes the top-k elements 
per block,
+   * using a simple dot product (instead of gemm) and an efficient 
[[BoundedPriorityQueue]].
+   * It then computes the global top-k by aggregating the per block top-k 
elements with
+   * a [[TopByKeyAggregator]]. This significantly reduces the size of 
intermediate and shuffle data.
+   * This is the DataFrame equivalent to the approach used in
+   * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]].
+   *
* @param srcFactors src factors for which to generate recommendations
* @param dstFactors dst factors used to make recommendations
* @param srcOutputColumn name of the column for the source ID in the output 
DataFrame
@@ -372,11 +385,43 @@ class ALSModel private[ml] (
   num: Int): DataFrame = {
 import srcFactors.sparkSession.implicits._
 
-val ratings = srcFactors.crossJoin(dstFactors)
-  .select(
-srcFactors("id"),
-dstFactors("id"),
-predict(srcFactors("features"), dstFactors("features")))
+val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
+val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
+val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
+  .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
+  .flatMap { case (srcIter, dstIter) =>
+val m = srcIter.size
+val n = math.min(dstIter.size, num)
+val output = new Array[(Int, Int, Float)](m * n)
+var j = 0
+val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2))
+srcIter.foreach { case (srcId, srcFactor) =>
+  dstIter.foreach { case (dstId, dstFactor) =>
+/*
+ * The below code is equivalent to
+ *`val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)`
+ * This handwritten version is as or more efficient as BLAS calls 
in this case.
+ */
+var score = 0.0f
+var k = 0
+while (k < rank) {
+  score += srcFactor(k) * dstFactor(k)
+  k += 1
+}
+pq += dstId -> score
+  }
+  val pqIter = pq.iterator
+  var i = 0
+  while (i < n) {
+val 

spark git commit: [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll

2017-05-09 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master b952b44af -> 807942476


[SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll

The recommendForAll of MLLIB ALS is very slow.
GC is a key problem of the current method.
The task use the following code to keep temp result:
val output = new Array[(Int, (Int, Double))](m*n)
m = n = 4096 (default value, no method to set)
so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and 
cause serious GC problem, and it is frequently OOM.

Actually, we don't need to save all the temp result. Support we recommend topK 
(topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 
4 + 8) memory to save the temp result.

The Test Environment:
3 workers: each work 10 core, each work 30G memory, each work 1 executor.
The Data: User 480,000, and Item 17,000

BlockSize: 1024  2048  4096  8192
Old method:  245s  332s  488s  OOM
This solution: 121s  118s   117s  120s

The existing UT.

Author: Peng 
Author: Peng Meng 

Closes #17742 from mpjlu/OptimizeAls.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80794247
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80794247
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80794247

Branch: refs/heads/master
Commit: 8079424763c2043264f30a6898ce964379bd9b56
Parents: b952b44
Author: Peng 
Authored: Tue May 9 10:05:49 2017 +0200
Committer: Nick Pentreath 
Committed: Tue May 9 10:06:48 2017 +0200

--
 .../MatrixFactorizationModel.scala  | 81 
 1 file changed, 50 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80794247/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 23045fa..d45866c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.BoundedPriorityQueue
 
 /**
  * Model representing the result of matrix factorization.
@@ -274,46 +275,64 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(rank, srcFeatures)
-val dstBlocks = blockify(rank, dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
-  case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
-val m = srcIds.length
-val n = dstIds.length
-val ratings = srcFactors.transpose.multiply(dstFactors)
-val output = new Array[(Int, (Int, Double))](m * n)
-var k = 0
-ratings.foreachActive { (i, j, r) =>
-  output(k) = (srcIds(i), (dstIds(j), r))
-  k += 1
+val srcBlocks = blockify(srcFeatures)
+val dstBlocks = blockify(dstFeatures)
+/**
+ * The previous approach used for computing top-k recommendations aimed to 
group
+ * individual factor vectors into blocks, so that Level 3 BLAS operations 
(gemm) could
+ * be used for efficiency. However, this causes excessive GC pressure due 
to the large
+ * arrays required for intermediate result storage, as well as a high 
sensitivity to the
+ * block size used.
+ * The following approach still groups factors into blocks, but instead 
computes the
+ * top-k elements per block, using a simple dot product (instead of gemm) 
and an efficient
+ * [[BoundedPriorityQueue]]. This avoids any large intermediate data 
structures and results
+ * in significantly reduced GC pressure as well as shuffle data, which far 
outweighs
+ * any cost incurred from not using Level 3 BLAS operations.
+ */
+val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
+  val m = srcIter.size
+  val n = math.min(dstIter.size, num)
+  val output = new Array[(Int, (Int, Double))](m * n)
+  var j = 0
+  val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
+  srcIter.foreach { case (srcId, srcFactor) =>
+dstIter.foreach { case (dstId, dstFactor) =>
+  /*
+

spark git commit: [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll

2017-05-09 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 54e074349 -> 72fca9a0a


[SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll

The recommendForAll of MLLIB ALS is very slow.
GC is a key problem of the current method.
The task use the following code to keep temp result:
val output = new Array[(Int, (Int, Double))](m*n)
m = n = 4096 (default value, no method to set)
so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and 
cause serious GC problem, and it is frequently OOM.

Actually, we don't need to save all the temp result. Support we recommend topK 
(topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 
4 + 8) memory to save the temp result.

The Test Environment:
3 workers: each work 10 core, each work 30G memory, each work 1 executor.
The Data: User 480,000, and Item 17,000

BlockSize: 1024  2048  4096  8192
Old method:  245s  332s  488s  OOM
This solution: 121s  118s   117s  120s

The existing UT.

Author: Peng 
Author: Peng Meng 

Closes #17742 from mpjlu/OptimizeAls.

(cherry picked from commit 8079424763c2043264f30a6898ce964379bd9b56)
Signed-off-by: Nick Pentreath 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72fca9a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72fca9a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72fca9a0

Branch: refs/heads/branch-2.2
Commit: 72fca9a0a7a6dd2ab7c338fab9666b51cd981cce
Parents: 54e0743
Author: Peng 
Authored: Tue May 9 10:05:49 2017 +0200
Committer: Nick Pentreath 
Committed: Tue May 9 10:08:23 2017 +0200

--
 .../MatrixFactorizationModel.scala  | 81 
 1 file changed, 50 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72fca9a0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 23045fa..d45866c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.BoundedPriorityQueue
 
 /**
  * Model representing the result of matrix factorization.
@@ -274,46 +275,64 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(rank, srcFeatures)
-val dstBlocks = blockify(rank, dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
-  case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
-val m = srcIds.length
-val n = dstIds.length
-val ratings = srcFactors.transpose.multiply(dstFactors)
-val output = new Array[(Int, (Int, Double))](m * n)
-var k = 0
-ratings.foreachActive { (i, j, r) =>
-  output(k) = (srcIds(i), (dstIds(j), r))
-  k += 1
+val srcBlocks = blockify(srcFeatures)
+val dstBlocks = blockify(dstFeatures)
+/**
+ * The previous approach used for computing top-k recommendations aimed to 
group
+ * individual factor vectors into blocks, so that Level 3 BLAS operations 
(gemm) could
+ * be used for efficiency. However, this causes excessive GC pressure due 
to the large
+ * arrays required for intermediate result storage, as well as a high 
sensitivity to the
+ * block size used.
+ * The following approach still groups factors into blocks, but instead 
computes the
+ * top-k elements per block, using a simple dot product (instead of gemm) 
and an efficient
+ * [[BoundedPriorityQueue]]. This avoids any large intermediate data 
structures and results
+ * in significantly reduced GC pressure as well as shuffle data, which far 
outweighs
+ * any cost incurred from not using Level 3 BLAS operations.
+ */
+val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
+  val m = srcIter.size
+  val n = math.min(dstIter.size, num)
+  val output = new Array[(Int, (Int, Double))](m * n)
+  var j = 0
+  val pq = new BoundedPriorityQueue[(Int,