spark git commit: [SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name
Repository: spark Updated Branches: refs/heads/branch-2.2 6a996b362 -> 7b6f3a118 [SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name ## What changes were proposed in this pull request? One of the common usability problems around reading data in spark (particularly CSV) is that there can often be a conflict between different readers in the classpath. As an example, if someone launches a 2.x spark shell with the spark-csv package in the classpath, Spark currently fails in an extremely unfriendly way (see databricks/spark-csv#367): ```bash ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0 scala> val df = spark.read.csv("/foo/bar.csv") java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412) ... 48 elided ``` This PR proposes a simple way of fixing this error by picking up the internal datasource if there is single (the datasource that has "org.apache.spark" prefix). ```scala scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` ```scala scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` ## How was this patch tested? Manually tested as below: ```bash ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0 ``` ```scala spark.sparkContext.setLogLevel("WARN") ``` **positive cases**: ```scala scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` ```scala scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` (newlines were inserted for readability). ```scala scala> spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc") ``` ```scala scala> spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc") ``` **negative cases**: ```scala scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc") java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation ... ``` ```scala scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc") java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv.CsvRelatio. Please find packages at http://spark.apache.org/third-party-projects.html ... ``` Author: hyukjinkwonCloses #17916 from HyukjinKwon/datasource-detect. (cherry picked from commit 3d2131ab4ddead29601fb3c597b798202ac25fdd) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b6f3a11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b6f3a11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b6f3a11 Branch: refs/heads/branch-2.2 Commit: 7b6f3a118e973216264bbf356af2bb1e7870466e Parents: 6a996b3 Author: hyukjinkwon Authored: Wed May 10 13:44:47 2017 +0800 Committer: Wenchen Fan Committed: Wed May 10 13:45:07 2017 +0800
spark git commit: [SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name
Repository: spark Updated Branches: refs/heads/master 771abeb46 -> 3d2131ab4 [SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name ## What changes were proposed in this pull request? One of the common usability problems around reading data in spark (particularly CSV) is that there can often be a conflict between different readers in the classpath. As an example, if someone launches a 2.x spark shell with the spark-csv package in the classpath, Spark currently fails in an extremely unfriendly way (see databricks/spark-csv#367): ```bash ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0 scala> val df = spark.read.csv("/foo/bar.csv") java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412) ... 48 elided ``` This PR proposes a simple way of fixing this error by picking up the internal datasource if there is single (the datasource that has "org.apache.spark" prefix). ```scala scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` ```scala scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` ## How was this patch tested? Manually tested as below: ```bash ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0 ``` ```scala spark.sparkContext.setLogLevel("WARN") ``` **positive cases**: ```scala scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` ```scala scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc") 17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat). ``` (newlines were inserted for readability). ```scala scala> spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc") ``` ```scala scala> spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc") ``` **negative cases**: ```scala scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc") java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation ... ``` ```scala scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc") java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv.CsvRelatio. Please find packages at http://spark.apache.org/third-party-projects.html ... ``` Author: hyukjinkwonCloses #17916 from HyukjinKwon/datasource-detect. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d2131ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d2131ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d2131ab Branch: refs/heads/master Commit: 3d2131ab4ddead29601fb3c597b798202ac25fdd Parents: 771abeb Author: hyukjinkwon Authored: Wed May 10 13:44:47 2017 +0800 Committer: Wenchen Fan Committed: Wed May 10 13:44:47 2017 +0800 -- .../sql/execution/datasources/DataSource.scala | 19
spark git commit: [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey
Repository: spark Updated Branches: refs/heads/branch-2.1 12c937ede -> 50f28dfe4 [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey ## What changes were proposed in this pull request? The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 1310720`: ```sql CREATE TABLE tab1(int int, int2 int, str string); CREATE TABLE tab2(int int, int2 int, str string); INSERT INTO tab1 values(1,1,'str'); INSERT INTO tab1 values(2,2,'str'); INSERT INTO tab2 values(1,1,'str'); INSERT INTO tab2 values(2,3,'str'); SELECT count(*) FROM ( SELECT t1.int, t2.int2 FROM (SELECT * FROM tab1 LIMIT 1310721) t1 INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2 ON (t1.int = t2.int AND t1.int2 = t2.int2) ) t; ``` This pull request fix this issue. ## How was this patch tested? unit tests Author: Yuming WangCloses #17920 from wangyum/SPARK-17685. (cherry picked from commit 771abeb46f637592aba2e63db2ed05b6cabfd0be) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50f28dfe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50f28dfe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50f28dfe Branch: refs/heads/branch-2.1 Commit: 50f28dfe43410dadabbecc62e34fde8bacc8aee6 Parents: 12c937e Author: Yuming Wang Authored: Tue May 9 19:45:00 2017 -0700 Committer: Herman van Hovell Committed: Tue May 9 19:45:22 2017 -0700 -- .../spark/sql/execution/joins/SortMergeJoinExec.scala | 1 + .../scala/org/apache/spark/sql/DataFrameJoinSuite.scala | 10 ++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/50f28dfe/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index a1f9416..89a9b38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -342,6 +342,7 @@ case class SortMergeJoinExec( keys: Seq[Expression], input: Seq[Attribute]): Seq[ExprCode] = { ctx.INPUT_ROW = row +ctx.currentVars = null keys.map(BindReferences.bindReference(_, input).genCode(ctx)) } http://git-wip-us.apache.org/repos/asf/spark/blob/50f28dfe/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 541ffb5..9383e83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -248,4 +248,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { val ab = a.join(b, Seq("a"), "fullouter") checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) } + + test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") { +val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str") +val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str") +val limit = 1310721 +val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), "inner") + .agg(count($"int")) +checkAnswer(innerJoin, Row(1) :: Nil) + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey
Repository: spark Updated Branches: refs/heads/branch-2.2 7600a7ab6 -> 6a996b362 [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey ## What changes were proposed in this pull request? The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 1310720`: ```sql CREATE TABLE tab1(int int, int2 int, str string); CREATE TABLE tab2(int int, int2 int, str string); INSERT INTO tab1 values(1,1,'str'); INSERT INTO tab1 values(2,2,'str'); INSERT INTO tab2 values(1,1,'str'); INSERT INTO tab2 values(2,3,'str'); SELECT count(*) FROM ( SELECT t1.int, t2.int2 FROM (SELECT * FROM tab1 LIMIT 1310721) t1 INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2 ON (t1.int = t2.int AND t1.int2 = t2.int2) ) t; ``` This pull request fix this issue. ## How was this patch tested? unit tests Author: Yuming WangCloses #17920 from wangyum/SPARK-17685. (cherry picked from commit 771abeb46f637592aba2e63db2ed05b6cabfd0be) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a996b36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a996b36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a996b36 Branch: refs/heads/branch-2.2 Commit: 6a996b36283dcd22ff7aa38968a80f575d2f151e Parents: 7600a7a Author: Yuming Wang Authored: Tue May 9 19:45:00 2017 -0700 Committer: Herman van Hovell Committed: Tue May 9 19:45:09 2017 -0700 -- .../spark/sql/execution/joins/SortMergeJoinExec.scala | 1 + .../scala/org/apache/spark/sql/DataFrameJoinSuite.scala | 10 ++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a996b36/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index c6aae1a..26fb610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -371,6 +371,7 @@ case class SortMergeJoinExec( keys: Seq[Expression], input: Seq[Attribute]): Seq[ExprCode] = { ctx.INPUT_ROW = row +ctx.currentVars = null keys.map(BindReferences.bindReference(_, input).genCode(ctx)) } http://git-wip-us.apache.org/repos/asf/spark/blob/6a996b36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 4a52af6..aef0d7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -264,4 +264,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { val ab = a.join(b, Seq("a"), "fullouter") checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) } + + test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") { +val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str") +val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str") +val limit = 1310721 +val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), "inner") + .agg(count($"int")) +checkAnswer(innerJoin, Row(1) :: Nil) + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey
Repository: spark Updated Branches: refs/heads/master c0189abc7 -> 771abeb46 [SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey ## What changes were proposed in this pull request? The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 1310720`: ```sql CREATE TABLE tab1(int int, int2 int, str string); CREATE TABLE tab2(int int, int2 int, str string); INSERT INTO tab1 values(1,1,'str'); INSERT INTO tab1 values(2,2,'str'); INSERT INTO tab2 values(1,1,'str'); INSERT INTO tab2 values(2,3,'str'); SELECT count(*) FROM ( SELECT t1.int, t2.int2 FROM (SELECT * FROM tab1 LIMIT 1310721) t1 INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2 ON (t1.int = t2.int AND t1.int2 = t2.int2) ) t; ``` This pull request fix this issue. ## How was this patch tested? unit tests Author: Yuming WangCloses #17920 from wangyum/SPARK-17685. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/771abeb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/771abeb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/771abeb4 Branch: refs/heads/master Commit: 771abeb46f637592aba2e63db2ed05b6cabfd0be Parents: c0189ab Author: Yuming Wang Authored: Tue May 9 19:45:00 2017 -0700 Committer: Herman van Hovell Committed: Tue May 9 19:45:00 2017 -0700 -- .../spark/sql/execution/joins/SortMergeJoinExec.scala | 1 + .../scala/org/apache/spark/sql/DataFrameJoinSuite.scala | 10 ++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/771abeb4/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index c6aae1a..26fb610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -371,6 +371,7 @@ case class SortMergeJoinExec( keys: Seq[Expression], input: Seq[Attribute]): Seq[ExprCode] = { ctx.INPUT_ROW = row +ctx.currentVars = null keys.map(BindReferences.bindReference(_, input).genCode(ctx)) } http://git-wip-us.apache.org/repos/asf/spark/blob/771abeb4/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 4a52af6..aef0d7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -264,4 +264,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { val ab = a.join(b, Seq("a"), "fullouter") checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) } + + test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") { +val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str") +val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str") +val limit = 1310721 +val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), "inner") + .agg(count($"int")) +checkAnswer(innerJoin, Row(1) :: Nil) + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
Repository: spark Updated Branches: refs/heads/branch-2.2 d191b962d -> 7600a7ab6 [SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute ## What changes were proposed in this pull request? Any Dataset/DataFrame batch query with the operation `withWatermark` does not execute because the batch planner does not have any rule to explicitly handle the EventTimeWatermark logical plan. The right solution is to simply remove the plan node, as the watermark should not affect any batch query in any way. Changes: - In this PR, we add a new rule `EliminateEventTimeWatermark` to check if we need to ignore the event time watermark. We will ignore watermark in any batch query. Depends upon: - [SPARK-20672](https://issues.apache.org/jira/browse/SPARK-20672). We can not add this rule into analyzer directly, because streaming query will be copied to `triggerLogicalPlan ` in every trigger, and the rule will be applied to `triggerLogicalPlan` mistakenly. Others: - A typo fix in example. ## How was this patch tested? add new unit test. Author: uncleGenCloses #17896 from uncleGen/SPARK-20373. (cherry picked from commit c0189abc7c6ddbecc1832d2ff0cfc5546a010b60) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7600a7ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7600a7ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7600a7ab Branch: refs/heads/branch-2.2 Commit: 7600a7ab65777a59f3a33edef40328b6a5d864ef Parents: d191b96 Author: uncleGen Authored: Tue May 9 15:08:09 2017 -0700 Committer: Shixiong Zhu Committed: Tue May 9 15:08:38 2017 -0700 -- docs/structured-streaming-programming-guide.md| 3 +++ .../examples/sql/streaming/StructuredSessionization.scala | 4 ++-- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 3 ++- .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 10 ++ 5 files changed, 27 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7600a7ab/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 53b3db2..bd01be9 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -901,6 +901,9 @@ Some sinks (e.g. files) may not supported fine-grained updates that Update Mode with them, we have also support Append Mode, where only the *final counts* are written to sink. This is illustrated below. +Note that using `withWatermark` on a non-streaming Dataset is no-op. As the watermark should not affect +any batch query in any way, we will ignore it directly. + ![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png) Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. http://git-wip-us.apache.org/repos/asf/spark/blob/7600a7ab/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala index 2ce792c..ed63fb6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala @@ -34,14 +34,14 @@ import org.apache.spark.sql.streaming._ * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk ` * and then run the example - * `$ bin/run-example sql.streaming.StructuredNetworkWordCount + * `$ bin/run-example sql.streaming.StructuredSessionization * localhost ` */ object StructuredSessionization { def main(args: Array[String]): Unit = { if (args.length < 2) { - System.err.println("Usage: StructuredNetworkWordCount ") + System.err.println("Usage: StructuredSessionization ") System.exit(1) } http://git-wip-us.apache.org/repos/asf/spark/blob/7600a7ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
spark git commit: [SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
Repository: spark Updated Branches: refs/heads/master f79aa285c -> c0189abc7 [SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute ## What changes were proposed in this pull request? Any Dataset/DataFrame batch query with the operation `withWatermark` does not execute because the batch planner does not have any rule to explicitly handle the EventTimeWatermark logical plan. The right solution is to simply remove the plan node, as the watermark should not affect any batch query in any way. Changes: - In this PR, we add a new rule `EliminateEventTimeWatermark` to check if we need to ignore the event time watermark. We will ignore watermark in any batch query. Depends upon: - [SPARK-20672](https://issues.apache.org/jira/browse/SPARK-20672). We can not add this rule into analyzer directly, because streaming query will be copied to `triggerLogicalPlan ` in every trigger, and the rule will be applied to `triggerLogicalPlan` mistakenly. Others: - A typo fix in example. ## How was this patch tested? add new unit test. Author: uncleGenCloses #17896 from uncleGen/SPARK-20373. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0189abc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0189abc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0189abc Branch: refs/heads/master Commit: c0189abc7c6ddbecc1832d2ff0cfc5546a010b60 Parents: f79aa28 Author: uncleGen Authored: Tue May 9 15:08:09 2017 -0700 Committer: Shixiong Zhu Committed: Tue May 9 15:08:09 2017 -0700 -- docs/structured-streaming-programming-guide.md| 3 +++ .../examples/sql/streaming/StructuredSessionization.scala | 4 ++-- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 3 ++- .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 10 ++ 5 files changed, 27 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 53b3db2..bd01be9 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -901,6 +901,9 @@ Some sinks (e.g. files) may not supported fine-grained updates that Update Mode with them, we have also support Append Mode, where only the *final counts* are written to sink. This is illustrated below. +Note that using `withWatermark` on a non-streaming Dataset is no-op. As the watermark should not affect +any batch query in any way, we will ignore it directly. + ![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png) Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala index 2ce792c..ed63fb6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala @@ -34,14 +34,14 @@ import org.apache.spark.sql.streaming._ * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk ` * and then run the example - * `$ bin/run-example sql.streaming.StructuredNetworkWordCount + * `$ bin/run-example sql.streaming.StructuredSessionization * localhost ` */ object StructuredSessionization { def main(args: Array[String]): Unit = { if (args.length < 2) { - System.err.println("Usage: StructuredNetworkWordCount ") + System.err.println("Usage: StructuredSessionization ") System.exit(1) } http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 72e7d5d..c56dd36 100644 ---
spark git commit: Revert "[SPARK-20311][SQL] Support aliases for table value functions"
Repository: spark Updated Branches: refs/heads/branch-2.2 9e8d23b3a -> d191b962d Revert "[SPARK-20311][SQL] Support aliases for table value functions" This reverts commit 714811d0b5bcb5d47c39782ff74f898d276ecc59. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d191b962 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d191b962 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d191b962 Branch: refs/heads/branch-2.2 Commit: d191b962dc81c015fa92a38d882a8c7ea620ef06 Parents: 9e8d23b Author: Yin HuaiAuthored: Tue May 9 14:47:45 2017 -0700 Committer: Yin Huai Committed: Tue May 9 14:49:02 2017 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 ++ .../analysis/ResolveTableValuedFunctions.scala | 22 +++- .../sql/catalyst/analysis/unresolved.scala | 10 ++--- .../spark/sql/catalyst/parser/AstBuilder.scala | 17 --- .../sql/catalyst/analysis/AnalysisSuite.scala | 14 + .../sql/catalyst/parser/PlanParserSuite.scala | 13 +--- 6 files changed, 17 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d191b962/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 15e4dd4..1ecb3d1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -472,23 +472,15 @@ identifierComment ; relationPrimary -: tableIdentifier sample? (AS? strictIdentifier)? #tableName -| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery -| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation -| inlineTable #inlineTableDefault2 -| functionTable#tableValuedFunction +: tableIdentifier sample? (AS? strictIdentifier)? #tableName +| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery +| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation +| inlineTable #inlineTableDefault2 +| identifier '(' (expression (',' expression)*)? ')' #tableValuedFunction ; inlineTable -: VALUES expression (',' expression)* tableAlias -; - -functionTable -: identifier '(' (expression (',' expression)*)? ')' tableAlias -; - -tableAlias -: (AS? identifier identifierList?)? +: VALUES expression (',' expression)* (AS? identifier identifierList?)? ; rowFormat http://git-wip-us.apache.org/repos/asf/spark/blob/d191b962/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala index dad1340..de6de24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Range} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.{DataType, IntegerType, LongType} @@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => - val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { + builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { case Some(tvf) => val resolved = tvf.flatMap { case (argList, resolver) => argList.implicitCast(u.functionArgs) match { @@ -125,21 +125,5 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan]
spark git commit: Revert "[SPARK-20311][SQL] Support aliases for table value functions"
Repository: spark Updated Branches: refs/heads/master ac1ab6b9d -> f79aa285c Revert "[SPARK-20311][SQL] Support aliases for table value functions" This reverts commit 714811d0b5bcb5d47c39782ff74f898d276ecc59. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f79aa285 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f79aa285 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f79aa285 Branch: refs/heads/master Commit: f79aa285cf115963ba06a9cacb3dbd7e3cbf7728 Parents: ac1ab6b Author: Yin HuaiAuthored: Tue May 9 14:47:45 2017 -0700 Committer: Yin Huai Committed: Tue May 9 14:47:45 2017 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 ++ .../analysis/ResolveTableValuedFunctions.scala | 22 +++- .../sql/catalyst/analysis/unresolved.scala | 10 ++--- .../spark/sql/catalyst/parser/AstBuilder.scala | 17 --- .../sql/catalyst/analysis/AnalysisSuite.scala | 14 + .../sql/catalyst/parser/PlanParserSuite.scala | 13 +--- 6 files changed, 17 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f79aa285/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 41daf58..14c511f 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -472,23 +472,15 @@ identifierComment ; relationPrimary -: tableIdentifier sample? (AS? strictIdentifier)? #tableName -| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery -| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation -| inlineTable #inlineTableDefault2 -| functionTable#tableValuedFunction +: tableIdentifier sample? (AS? strictIdentifier)? #tableName +| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery +| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation +| inlineTable #inlineTableDefault2 +| identifier '(' (expression (',' expression)*)? ')' #tableValuedFunction ; inlineTable -: VALUES expression (',' expression)* tableAlias -; - -functionTable -: identifier '(' (expression (',' expression)*)? ')' tableAlias -; - -tableAlias -: (AS? identifier identifierList?)? +: VALUES expression (',' expression)* (AS? identifier identifierList?)? ; rowFormat http://git-wip-us.apache.org/repos/asf/spark/blob/f79aa285/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala index dad1340..de6de24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Range} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.{DataType, IntegerType, LongType} @@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => - val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { + builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { case Some(tvf) => val resolved = tvf.flatMap { case (argList, resolver) => argList.implicitCast(u.functionArgs) match { @@ -125,21 +125,5 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
spark git commit: Revert "[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps"
Repository: spark Updated Branches: refs/heads/master 1b85bcd92 -> ac1ab6b9d Revert "[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps" This reverts commit 22691556e5f0dfbac81b8cc9ca0a67c70c1711ca. See JIRA ticket for more information. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac1ab6b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac1ab6b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac1ab6b9 Branch: refs/heads/master Commit: ac1ab6b9db188ac54c745558d57dd0a031d0b162 Parents: 1b85bcd Author: Reynold XinAuthored: Tue May 9 11:35:59 2017 -0700 Committer: Reynold Xin Committed: Tue May 9 11:35:59 2017 -0700 -- .../spark/sql/catalyst/catalog/interface.scala | 4 +- .../spark/sql/catalyst/util/DateTimeUtils.scala | 5 - .../parquet/VectorizedColumnReader.java | 28 +- .../parquet/VectorizedParquetRecordReader.java | 6 +- .../spark/sql/execution/command/tables.scala| 8 +- .../datasources/parquet/ParquetFileFormat.scala | 2 - .../parquet/ParquetReadSupport.scala| 3 +- .../parquet/ParquetRecordMaterializer.scala | 9 +- .../parquet/ParquetRowConverter.scala | 53 +-- .../parquet/ParquetWriteSupport.scala | 25 +- .../spark/sql/hive/HiveExternalCatalog.scala| 11 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +- .../hive/ParquetHiveCompatibilitySuite.scala| 379 +-- 13 files changed, 29 insertions(+), 516 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c39017e..cc0cbba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -132,10 +132,10 @@ case class CatalogTablePartition( /** * Given the partition schema, returns a row with that schema holding the partition values. */ - def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = { + def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) val timeZoneId = caseInsensitiveProperties.getOrElse( - DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId) + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) InternalRow.fromSeq(partitionSchema.map { field => val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { null http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index bf596fa..6c1592f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -498,11 +498,6 @@ object DateTimeUtils { false } - lazy val validTimezones = TimeZone.getAvailableIDs().toSet - def isValidTimezone(timezoneId: String): Boolean = { -validTimezones.contains(timezoneId) - } - /** * Returns the microseconds since year zero (-17999) from microseconds since epoch. */ http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index dabbc2b..9d641b5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -18,9 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; -import java.util.TimeZone; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.BytesUtils; import
spark git commit: [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version
Repository: spark Updated Branches: refs/heads/branch-2.2 c7bd909f6 -> 9e8d23b3a [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version ## What changes were proposed in this pull request? Drop the hadoop distirbution name from the Python version (PEP440 - https://www.python.org/dev/peps/pep-0440/). We've been using the local version string to disambiguate between different hadoop versions packaged with PySpark, but PEP0440 states that local versions should not be used when publishing up-stream. Since we no longer make PySpark pip packages for different hadoop versions, we can simply drop the hadoop information. If at a later point we need to start publishing different hadoop versions we can look at make different packages or similar. ## How was this patch tested? Ran `make-distribution` locally Author: Holden KarauCloses #17885 from holdenk/SPARK-20627-remove-pip-local-version-string. (cherry picked from commit 1b85bcd9298cf84dd746fe8e91ab0b0df69ef17e) Signed-off-by: Holden Karau Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e8d23b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e8d23b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e8d23b3 Branch: refs/heads/branch-2.2 Commit: 9e8d23b3a2f99985ffb3c4eb67ac0a2774fa5b02 Parents: c7bd909 Author: Holden Karau Authored: Tue May 9 11:25:29 2017 -0700 Committer: Holden Karau Committed: Tue May 9 11:26:00 2017 -0700 -- dev/create-release/release-build.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9e8d23b3/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 7976d8a..a72307a 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -163,9 +163,9 @@ if [[ "$1" == "package" ]]; then export ZINC_PORT=$ZINC_PORT echo "Creating distribution: $NAME ($FLAGS)" -# Write out the NAME and VERSION to PySpark version info we rewrite the - into a . and SNAPSHOT -# to dev0 to be closer to PEP440. We use the NAME as a "local version". -PYSPARK_VERSION=`echo "$SPARK_VERSION+$NAME" | sed -r "s/-/./" | sed -r "s/SNAPSHOT/dev0/"` +# Write out the VERSION to PySpark version info we rewrite the - into a . and SNAPSHOT +# to dev0 to be closer to PEP440. +PYSPARK_VERSION=`echo "$SPARK_VERSION" | sed -r "s/-/./" | sed -r "s/SNAPSHOT/dev0/"` echo "__version__='$PYSPARK_VERSION'" > python/pyspark/version.py # Get maven home set by MVN - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version
Repository: spark Updated Branches: refs/heads/branch-2.1 f7a91a17e -> 12c937ede [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version ## What changes were proposed in this pull request? Drop the hadoop distirbution name from the Python version (PEP440 - https://www.python.org/dev/peps/pep-0440/). We've been using the local version string to disambiguate between different hadoop versions packaged with PySpark, but PEP0440 states that local versions should not be used when publishing up-stream. Since we no longer make PySpark pip packages for different hadoop versions, we can simply drop the hadoop information. If at a later point we need to start publishing different hadoop versions we can look at make different packages or similar. ## How was this patch tested? Ran `make-distribution` locally Author: Holden KarauCloses #17885 from holdenk/SPARK-20627-remove-pip-local-version-string. (cherry picked from commit 1b85bcd9298cf84dd746fe8e91ab0b0df69ef17e) Signed-off-by: Holden Karau Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12c937ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12c937ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12c937ed Branch: refs/heads/branch-2.1 Commit: 12c937ede309d6886ce1ceadff2691f31dcdd6d3 Parents: f7a91a1 Author: Holden Karau Authored: Tue May 9 11:25:29 2017 -0700 Committer: Holden Karau Committed: Tue May 9 11:26:25 2017 -0700 -- dev/create-release/release-build.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/12c937ed/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index ab17f2f..c4ddc21 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -163,9 +163,9 @@ if [[ "$1" == "package" ]]; then export ZINC_PORT=$ZINC_PORT echo "Creating distribution: $NAME ($FLAGS)" -# Write out the NAME and VERSION to PySpark version info we rewrite the - into a . and SNAPSHOT -# to dev0 to be closer to PEP440. We use the NAME as a "local version". -PYSPARK_VERSION=`echo "$SPARK_VERSION+$NAME" | sed -r "s/-/./" | sed -r "s/SNAPSHOT/dev0/"` +# Write out the VERSION to PySpark version info we rewrite the - into a . and SNAPSHOT +# to dev0 to be closer to PEP440. +PYSPARK_VERSION=`echo "$SPARK_VERSION" | sed -r "s/-/./" | sed -r "s/SNAPSHOT/dev0/"` echo "__version__='$PYSPARK_VERSION'" > python/pyspark/version.py # Get maven home set by MVN - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version
Repository: spark Updated Branches: refs/heads/master 25ee816e0 -> 1b85bcd92 [SPARK-20627][PYSPARK] Drop the hadoop distirbution name from the Python version ## What changes were proposed in this pull request? Drop the hadoop distirbution name from the Python version (PEP440 - https://www.python.org/dev/peps/pep-0440/). We've been using the local version string to disambiguate between different hadoop versions packaged with PySpark, but PEP0440 states that local versions should not be used when publishing up-stream. Since we no longer make PySpark pip packages for different hadoop versions, we can simply drop the hadoop information. If at a later point we need to start publishing different hadoop versions we can look at make different packages or similar. ## How was this patch tested? Ran `make-distribution` locally Author: Holden KarauCloses #17885 from holdenk/SPARK-20627-remove-pip-local-version-string. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b85bcd9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b85bcd9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b85bcd9 Branch: refs/heads/master Commit: 1b85bcd9298cf84dd746fe8e91ab0b0df69ef17e Parents: 25ee816 Author: Holden Karau Authored: Tue May 9 11:25:29 2017 -0700 Committer: Holden Karau Committed: Tue May 9 11:25:29 2017 -0700 -- dev/create-release/release-build.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b85bcd9/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 7976d8a..a72307a 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -163,9 +163,9 @@ if [[ "$1" == "package" ]]; then export ZINC_PORT=$ZINC_PORT echo "Creating distribution: $NAME ($FLAGS)" -# Write out the NAME and VERSION to PySpark version info we rewrite the - into a . and SNAPSHOT -# to dev0 to be closer to PEP440. We use the NAME as a "local version". -PYSPARK_VERSION=`echo "$SPARK_VERSION+$NAME" | sed -r "s/-/./" | sed -r "s/SNAPSHOT/dev0/"` +# Write out the VERSION to PySpark version info we rewrite the - into a . and SNAPSHOT +# to dev0 to be closer to PEP440. +PYSPARK_VERSION=`echo "$SPARK_VERSION" | sed -r "s/-/./" | sed -r "s/SNAPSHOT/dev0/"` echo "__version__='$PYSPARK_VERSION'" > python/pyspark/version.py # Get maven home set by MVN - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19876][BUILD] Move Trigger.java to java source hierarchy
Repository: spark Updated Branches: refs/heads/branch-2.2 73aa23b8e -> c7bd909f6 [SPARK-19876][BUILD] Move Trigger.java to java source hierarchy ## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See https://github.com/apache/spark/pull/17219 ## How was this patch tested? Existing tests. Author: Sean OwenCloses #17921 from srowen/SPARK-19876.2. (cherry picked from commit 25ee816e090c42f0e35be2d2cb0f8ec60726317c) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7bd909f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7bd909f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7bd909f Branch: refs/heads/branch-2.2 Commit: c7bd909f67209b4d1354c3d5b0a0fb1d4e28f205 Parents: 73aa23b Author: Sean Owen Authored: Tue May 9 10:22:23 2017 -0700 Committer: Herman van Hovell Committed: Tue May 9 10:22:32 2017 -0700 -- .../org/apache/spark/sql/streaming/Trigger.java | 105 +++ .../org/apache/spark/sql/streaming/Trigger.java | 105 --- 2 files changed, 105 insertions(+), 105 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7bd909f/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java new file mode 100644 index 000..3e3997f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; + +/** + * :: Experimental :: + * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +public class Trigger { + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long intervalMs) { + return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * :: Experimental :: + * (Java-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { + return ProcessingTime.create(interval, timeUnit); + } + + /** + * :: Experimental :: + * (Scala-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `duration` is 0, the query will run as fast as possible. + * + * {{{ + *import scala.concurrent.duration._ + *df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(Duration interval) { + return ProcessingTime.apply(interval); + } + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is effectively 0, the query will run as fast as possible. + * + * {{{ + *df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) + * }}} + *
spark git commit: [SPARK-19876][BUILD] Move Trigger.java to java source hierarchy
Repository: spark Updated Branches: refs/heads/master d099f414d -> 25ee816e0 [SPARK-19876][BUILD] Move Trigger.java to java source hierarchy ## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See https://github.com/apache/spark/pull/17219 ## How was this patch tested? Existing tests. Author: Sean OwenCloses #17921 from srowen/SPARK-19876.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25ee816e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25ee816e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25ee816e Branch: refs/heads/master Commit: 25ee816e090c42f0e35be2d2cb0f8ec60726317c Parents: d099f41 Author: Sean Owen Authored: Tue May 9 10:22:23 2017 -0700 Committer: Herman van Hovell Committed: Tue May 9 10:22:23 2017 -0700 -- .../org/apache/spark/sql/streaming/Trigger.java | 105 +++ .../org/apache/spark/sql/streaming/Trigger.java | 105 --- 2 files changed, 105 insertions(+), 105 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/25ee816e/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java new file mode 100644 index 000..3e3997f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; + +/** + * :: Experimental :: + * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +public class Trigger { + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long intervalMs) { + return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * :: Experimental :: + * (Java-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { + return ProcessingTime.create(interval, timeUnit); + } + + /** + * :: Experimental :: + * (Scala-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `duration` is 0, the query will run as fast as possible. + * + * {{{ + *import scala.concurrent.duration._ + *df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(Duration interval) { + return ProcessingTime.apply(interval); + } + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is effectively 0, the query will run as fast as possible. + * + * {{{ + *df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(String interval) { + return ProcessingTime.apply(interval); + } + + /** +
spark git commit: [SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF
Repository: spark Updated Branches: refs/heads/branch-2.2 08e1b78f0 -> 73aa23b8e [SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF ## What changes were proposed in this pull request? For some reason we don't have an API to register UserDefinedFunction as named UDF. It is a no brainer to add one, in addition to the existing register functions we have. ## How was this patch tested? Added a test case in UDFSuite for the new API. Author: Reynold XinCloses #17915 from rxin/SPARK-20674. (cherry picked from commit d099f414d2cb53f5a61f6e77317c736be6f953a0) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73aa23b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73aa23b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73aa23b8 Branch: refs/heads/branch-2.2 Commit: 73aa23b8ef64960e7f171aa07aec396667a2339d Parents: 08e1b78 Author: Reynold Xin Authored: Tue May 9 09:24:28 2017 -0700 Committer: Xiao Li Committed: Tue May 9 09:24:36 2017 -0700 -- .../org/apache/spark/sql/UDFRegistration.scala | 22 +--- .../scala/org/apache/spark/sql/UDFSuite.scala | 7 +++ 2 files changed, 26 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73aa23b8/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index a576733..6accf1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -70,15 +70,31 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends * @param name the name of the UDAF. * @param udaf the UDAF needs to be registered. * @return the registered UDAF. + * + * @since 1.5.0 */ - def register( - name: String, - udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = { + def register(name: String, udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = { def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf) functionRegistry.registerFunction(name, builder) udaf } + /** + * Register a user-defined function (UDF), for a UDF that's already defined using the DataFrame + * API (i.e. of type UserDefinedFunction). + * + * @param name the name of the UDF. + * @param udf the UDF needs to be registered. + * @return the registered UDF. + * + * @since 2.2.0 + */ + def register(name: String, udf: UserDefinedFunction): UserDefinedFunction = { +def builder(children: Seq[Expression]) = udf.apply(children.map(Column.apply) : _*).expr +functionRegistry.registerFunction(name, builder) +udf + } + // scalastyle:off line.size.limit /* register 0-22 were generated by this script http://git-wip-us.apache.org/repos/asf/spark/blob/73aa23b8/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index ae6b2bc..6f8723a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -93,6 +93,13 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4) } + test("UDF defined using UserDefinedFunction") { +import functions.udf +val foo = udf((x: Int) => x + 1) +spark.udf.register("foo", foo) +assert(sql("select foo(5)").head().getInt(0) == 6) + } + test("ZeroArgument UDF") { spark.udf.register("random0", () => { Math.random()}) assert(sql("SELECT random0()").head().getDouble(0) >= 0.0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF
Repository: spark Updated Branches: refs/heads/master f561a76b2 -> d099f414d [SPARK-20674][SQL] Support registering UserDefinedFunction as named UDF ## What changes were proposed in this pull request? For some reason we don't have an API to register UserDefinedFunction as named UDF. It is a no brainer to add one, in addition to the existing register functions we have. ## How was this patch tested? Added a test case in UDFSuite for the new API. Author: Reynold XinCloses #17915 from rxin/SPARK-20674. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d099f414 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d099f414 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d099f414 Branch: refs/heads/master Commit: d099f414d2cb53f5a61f6e77317c736be6f953a0 Parents: f561a76 Author: Reynold Xin Authored: Tue May 9 09:24:28 2017 -0700 Committer: Xiao Li Committed: Tue May 9 09:24:28 2017 -0700 -- .../org/apache/spark/sql/UDFRegistration.scala | 22 +--- .../scala/org/apache/spark/sql/UDFSuite.scala | 7 +++ 2 files changed, 26 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d099f414/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index a576733..6accf1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -70,15 +70,31 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends * @param name the name of the UDAF. * @param udaf the UDAF needs to be registered. * @return the registered UDAF. + * + * @since 1.5.0 */ - def register( - name: String, - udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = { + def register(name: String, udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = { def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf) functionRegistry.registerFunction(name, builder) udaf } + /** + * Register a user-defined function (UDF), for a UDF that's already defined using the DataFrame + * API (i.e. of type UserDefinedFunction). + * + * @param name the name of the UDF. + * @param udf the UDF needs to be registered. + * @return the registered UDF. + * + * @since 2.2.0 + */ + def register(name: String, udf: UserDefinedFunction): UserDefinedFunction = { +def builder(children: Seq[Expression]) = udf.apply(children.map(Column.apply) : _*).expr +functionRegistry.registerFunction(name, builder) +udf + } + // scalastyle:off line.size.limit /* register 0-22 were generated by this script http://git-wip-us.apache.org/repos/asf/spark/blob/d099f414/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index ae6b2bc..6f8723a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -93,6 +93,13 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4) } + test("UDF defined using UserDefinedFunction") { +import functions.udf +val foo = udf((x: Int) => x + 1) +spark.udf.register("foo", foo) +assert(sql("select foo(5)").head().getInt(0) == 6) + } + test("ZeroArgument UDF") { spark.udf.register("random0", () => { Math.random()}) assert(sql("SELECT random0()").head().getDouble(0) >= 0.0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases
Repository: spark Updated Branches: refs/heads/branch-2.2 272d2a10d -> 08e1b78f0 [SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases `ReplSuite.newProductSeqEncoder with REPL defined class` was flaky and throws OOM exception frequently. By analyzing the heap dump, we found the reason is that, in each test case of `ReplSuite`, we create a REPL instance, which creates a classloader and loads a lot of classes related to `SparkContext`. More details please see https://github.com/apache/spark/pull/17833#issuecomment-298711435. In this PR, we create a new test suite, `SingletonReplSuite`, which shares one REPL instances among all the test cases. Then we move most of the tests from `ReplSuite` to `SingletonReplSuite`, to avoid creating a lot of REPL instances and reduce memory footprint. test only change Author: Wenchen FanCloses #17844 from cloud-fan/flaky-test. (cherry picked from commit f561a76b2f895dea52f228a9376948242c3331ad) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e1b78f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e1b78f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e1b78f Branch: refs/heads/branch-2.2 Commit: 08e1b78f01955c7151d9e984d392d45deced6e34 Parents: 272d2a1 Author: Wenchen Fan Authored: Wed May 10 00:09:35 2017 +0800 Committer: Wenchen Fan Committed: Wed May 10 00:11:25 2017 +0800 -- .../main/scala/org/apache/spark/repl/Main.scala | 2 +- .../org/apache/spark/repl/SparkILoop.scala | 9 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 271 +--- .../apache/spark/repl/SingletonReplSuite.scala | 408 +++ 4 files changed, 412 insertions(+), 278 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08e1b78f/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 39fc621..b8b38e8 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -68,7 +68,7 @@ object Main extends Logging { if (!hasErrors) { interp.process(settings) // Repl starts and goes in loop of R.E.P.L - Option(sparkContext).map(_.stop) + Option(sparkContext).foreach(_.stop) } } http://git-wip-us.apache.org/repos/asf/spark/blob/08e1b78f/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 76a66c1..d1d25b7 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -86,15 +86,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) echo("Type :help for more information.") } - /** Add repl commands that needs to be blocked. e.g. reset */ - private val blockedCommands = Set[String]() - - /** Standard commands */ - lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = -standardCommands.filter(cmd => !blockedCommands(cmd.name)) - /** Available commands */ - override def commands: List[LoopCommand] = sparkStandardCommands + override def commands: List[LoopCommand] = standardCommands /** * We override `loadFiles` because we need to initialize Spark *before* the REPL http://git-wip-us.apache.org/repos/asf/spark/blob/08e1b78f/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 121a02a..c7ae194 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -21,12 +21,12 @@ import java.io._ import java.net.URLClassLoader import scala.collection.mutable.ArrayBuffer -import org.apache.commons.lang3.StringEscapeUtils + import org.apache.log4j.{Level, LogManager} + import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.util.Utils class ReplSuite extends
spark git commit: [SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases
Repository: spark Updated Branches: refs/heads/master 181261a81 -> f561a76b2 [SPARK-20548][FLAKY-TEST] share one REPL instance among REPL test cases ## What changes were proposed in this pull request? `ReplSuite.newProductSeqEncoder with REPL defined class` was flaky and throws OOM exception frequently. By analyzing the heap dump, we found the reason is that, in each test case of `ReplSuite`, we create a REPL instance, which creates a classloader and loads a lot of classes related to `SparkContext`. More details please see https://github.com/apache/spark/pull/17833#issuecomment-298711435. In this PR, we create a new test suite, `SingletonReplSuite`, which shares one REPL instances among all the test cases. Then we move most of the tests from `ReplSuite` to `SingletonReplSuite`, to avoid creating a lot of REPL instances and reduce memory footprint. ## How was this patch tested? test only change Author: Wenchen FanCloses #17844 from cloud-fan/flaky-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f561a76b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f561a76b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f561a76b Branch: refs/heads/master Commit: f561a76b2f895dea52f228a9376948242c3331ad Parents: 181261a Author: Wenchen Fan Authored: Wed May 10 00:09:35 2017 +0800 Committer: Wenchen Fan Committed: Wed May 10 00:09:35 2017 +0800 -- .../main/scala/org/apache/spark/repl/Main.scala | 2 +- .../org/apache/spark/repl/SparkILoop.scala | 9 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 272 + .../apache/spark/repl/SingletonReplSuite.scala | 408 +++ 4 files changed, 412 insertions(+), 279 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f561a76b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 39fc621..b8b38e8 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -68,7 +68,7 @@ object Main extends Logging { if (!hasErrors) { interp.process(settings) // Repl starts and goes in loop of R.E.P.L - Option(sparkContext).map(_.stop) + Option(sparkContext).foreach(_.stop) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f561a76b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 76a66c1..d1d25b7 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -86,15 +86,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) echo("Type :help for more information.") } - /** Add repl commands that needs to be blocked. e.g. reset */ - private val blockedCommands = Set[String]() - - /** Standard commands */ - lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = -standardCommands.filter(cmd => !blockedCommands(cmd.name)) - /** Available commands */ - override def commands: List[LoopCommand] = sparkStandardCommands + override def commands: List[LoopCommand] = standardCommands /** * We override `loadFiles` because we need to initialize Spark *before* the REPL http://git-wip-us.apache.org/repos/asf/spark/blob/f561a76b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 8fe2708..c7ae194 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -21,12 +21,12 @@ import java.io._ import java.net.URLClassLoader import scala.collection.mutable.ArrayBuffer -import org.apache.commons.lang3.StringEscapeUtils + import org.apache.log4j.{Level, LogManager} + import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.util.Utils class ReplSuite extends SparkFunSuite { @@ -148,71 +148,6 @@ class
spark git commit: [SPARK-20355] Add per application spark version on the history server headerpage
Repository: spark Updated Branches: refs/heads/master 714811d0b -> 181261a81 [SPARK-20355] Add per application spark version on the history server headerpage ## What changes were proposed in this pull request? Spark Version for a specific application is not displayed on the history page now. It should be nice to switch the spark version on the UI when we click on the specific application. Currently there seems to be way as SparkListenerLogStart records the application version. So, it should be trivial to listen to this event and provision this change on the UI. For Example https://cloud.githubusercontent.com/assets/8295799/25092650/41f3970a-2354-11e7-9b0d-4646d0adeb61.png;> https://cloud.githubusercontent.com/assets/8295799/25092743/9f9e2f28-2354-11e7-9605-f2f1c63f21fe.png;> {"Event":"SparkListenerLogStart","Spark Version":"2.0.0"} (Please fill in changes proposed in this fix) Modified the SparkUI for History server to listen to SparkLogListenerStart event and extract the version and print it. ## How was this patch tested? Manual testing of UI page. Attaching the UI screenshot changes here (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: SanketCloses #17658 from redsanket/SPARK-20355. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/181261a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/181261a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/181261a8 Branch: refs/heads/master Commit: 181261a81d592b93181135a8267570e0c9ab2243 Parents: 714811d Author: Sanket Authored: Tue May 9 09:30:09 2017 -0500 Committer: Tom Graves Committed: Tue May 9 09:30:09 2017 -0500 -- .../history/ApplicationHistoryProvider.scala | 3 ++- .../spark/deploy/history/FsHistoryProvider.scala | 17 - .../spark/scheduler/ApplicationEventListener.scala | 7 +++ .../spark/scheduler/EventLoggingListener.scala | 13 ++--- .../org/apache/spark/scheduler/SparkListener.scala | 4 ++-- .../apache/spark/scheduler/SparkListenerBus.scala | 1 - .../status/api/v1/ApplicationListResource.scala| 3 ++- .../scala/org/apache/spark/status/api/v1/api.scala | 3 ++- .../main/scala/org/apache/spark/ui/SparkUI.scala | 6 +- .../main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../application_list_json_expectation.json | 10 ++ .../completed_app_list_json_expectation.json | 11 +++ .../limit_app_list_json_expectation.json | 3 +++ .../maxDate2_app_list_json_expectation.json| 1 + .../maxDate_app_list_json_expectation.json | 2 ++ .../maxEndDate_app_list_json_expectation.json | 7 +++ ...e_and_maxEndDate_app_list_json_expectation.json | 4 .../minDate_app_list_json_expectation.json | 8 ...e_and_maxEndDate_app_list_json_expectation.json | 4 .../minEndDate_app_list_json_expectation.json | 6 +- .../one_app_json_expectation.json | 1 + .../one_app_multi_attempt_json_expectation.json| 2 ++ .../deploy/history/ApplicationCacheSuite.scala | 2 +- .../deploy/history/FsHistoryProviderSuite.scala| 4 ++-- project/MimaExcludes.scala | 3 +++ 25 files changed, 107 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/181261a8/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 6d8758a..5cb48ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -30,7 +30,8 @@ private[spark] case class ApplicationAttemptInfo( endTime: Long, lastUpdated: Long, sparkUser: String, -completed: Boolean = false) +completed: Boolean = false, +appSparkVersion: String) private[spark] case class ApplicationHistoryInfo( id: String, http://git-wip-us.apache.org/repos/asf/spark/blob/181261a8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
spark git commit: [SPARK-20311][SQL] Support aliases for table value functions
Repository: spark Updated Branches: refs/heads/branch-2.2 b3309676b -> 272d2a10d [SPARK-20311][SQL] Support aliases for table value functions ## What changes were proposed in this pull request? This pr added parsing rules to support aliases in table value functions. ## How was this patch tested? Added tests in `PlanParserSuite`. Author: Takeshi YamamuroCloses #17666 from maropu/SPARK-20311. (cherry picked from commit 714811d0b5bcb5d47c39782ff74f898d276ecc59) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/272d2a10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/272d2a10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/272d2a10 Branch: refs/heads/branch-2.2 Commit: 272d2a10d70588e1f80cc6579d4ec3c44b5bbfc2 Parents: b330967 Author: Takeshi Yamamuro Authored: Tue May 9 20:22:51 2017 +0800 Committer: Wenchen Fan Committed: Tue May 9 20:23:53 2017 +0800 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 -- .../analysis/ResolveTableValuedFunctions.scala | 22 +--- .../sql/catalyst/analysis/unresolved.scala | 10 +++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 17 +++ .../sql/catalyst/analysis/AnalysisSuite.scala | 14 - .../sql/catalyst/parser/PlanParserSuite.scala | 13 +++- 6 files changed, 79 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/272d2a10/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 1ecb3d1..15e4dd4 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -472,15 +472,23 @@ identifierComment ; relationPrimary -: tableIdentifier sample? (AS? strictIdentifier)? #tableName -| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery -| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation -| inlineTable #inlineTableDefault2 -| identifier '(' (expression (',' expression)*)? ')' #tableValuedFunction +: tableIdentifier sample? (AS? strictIdentifier)? #tableName +| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery +| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation +| inlineTable #inlineTableDefault2 +| functionTable#tableValuedFunction ; inlineTable -: VALUES expression (',' expression)* (AS? identifier identifierList?)? +: VALUES expression (',' expression)* tableAlias +; + +functionTable +: identifier '(' (expression (',' expression)*)? ')' tableAlias +; + +tableAlias +: (AS? identifier identifierList?)? ; rowFormat http://git-wip-us.apache.org/repos/asf/spark/blob/272d2a10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala index de6de24..dad1340 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range} +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Range} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.{DataType, IntegerType, LongType} @@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => - builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT))
spark git commit: [SPARK-20311][SQL] Support aliases for table value functions
Repository: spark Updated Branches: refs/heads/master 0d00c768a -> 714811d0b [SPARK-20311][SQL] Support aliases for table value functions ## What changes were proposed in this pull request? This pr added parsing rules to support aliases in table value functions. ## How was this patch tested? Added tests in `PlanParserSuite`. Author: Takeshi YamamuroCloses #17666 from maropu/SPARK-20311. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/714811d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/714811d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/714811d0 Branch: refs/heads/master Commit: 714811d0b5bcb5d47c39782ff74f898d276ecc59 Parents: 0d00c76 Author: Takeshi Yamamuro Authored: Tue May 9 20:22:51 2017 +0800 Committer: Wenchen Fan Committed: Tue May 9 20:22:51 2017 +0800 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 20 -- .../analysis/ResolveTableValuedFunctions.scala | 22 +--- .../sql/catalyst/analysis/unresolved.scala | 10 +++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 17 +++ .../sql/catalyst/analysis/AnalysisSuite.scala | 14 - .../sql/catalyst/parser/PlanParserSuite.scala | 13 +++- 6 files changed, 79 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/714811d0/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 14c511f..41daf58 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -472,15 +472,23 @@ identifierComment ; relationPrimary -: tableIdentifier sample? (AS? strictIdentifier)? #tableName -| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery -| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation -| inlineTable #inlineTableDefault2 -| identifier '(' (expression (',' expression)*)? ')' #tableValuedFunction +: tableIdentifier sample? (AS? strictIdentifier)? #tableName +| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery +| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation +| inlineTable #inlineTableDefault2 +| functionTable#tableValuedFunction ; inlineTable -: VALUES expression (',' expression)* (AS? identifier identifierList?)? +: VALUES expression (',' expression)* tableAlias +; + +functionTable +: identifier '(' (expression (',' expression)*)? ')' tableAlias +; + +tableAlias +: (AS? identifier identifierList?)? ; rowFormat http://git-wip-us.apache.org/repos/asf/spark/blob/714811d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala index de6de24..dad1340 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range} +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Range} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.{DataType, IntegerType, LongType} @@ -105,7 +105,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => - builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { + val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { case Some(tvf)
spark git commit: [SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the package of sql/core and sql/hive
Repository: spark Updated Branches: refs/heads/branch-2.2 4b7aa0b1d -> b3309676b [SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the package of sql/core and sql/hive ## What changes were proposed in this pull request? So far, we do not drop all the cataloged objects after each package. Sometimes, we might hit strange test case errors because the previous test suite did not drop the cataloged/temporary objects (tables/functions/database). At least, we can first clean up the environment when completing the package of `sql/core` and `sql/hive`. ## How was this patch tested? N/A Author: Xiao LiCloses #17908 from gatorsmile/reset. (cherry picked from commit 0d00c768a860fc03402c8f0c9081b8147c29133e) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3309676 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3309676 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3309676 Branch: refs/heads/branch-2.2 Commit: b3309676bb83a80d38b916066d046866a6f42ef0 Parents: 4b7aa0b Author: Xiao Li Authored: Tue May 9 20:10:50 2017 +0800 Committer: Wenchen Fan Committed: Tue May 9 20:11:08 2017 +0800 -- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala| 3 ++- .../scala/org/apache/spark/sql/test/SharedSQLContext.scala| 1 + .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 7 +-- 3 files changed, 4 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b3309676/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6c6d600..18e5146 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1251,9 +1251,10 @@ class SessionCatalog( dropTempFunction(func.funcName, ignoreIfNotExists = false) } } -tempTables.clear() +clearTempTables() globalTempViewManager.clear() functionRegistry.clear() +tableRelationCache.invalidateAll() // restore built-in functions FunctionRegistry.builtin.listFunction().foreach { f => val expressionInfo = FunctionRegistry.builtin.lookupFunction(f) http://git-wip-us.apache.org/repos/asf/spark/blob/b3309676/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 81c69a3..7cea4c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -74,6 +74,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua protected override def afterAll(): Unit = { super.afterAll() if (_spark != null) { + _spark.sessionState.catalog.reset() _spark.stop() _spark = null } http://git-wip-us.apache.org/repos/asf/spark/blob/b3309676/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index d9bb1f8..ee9ac21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -488,14 +488,9 @@ private[hive] class TestHiveSparkSession( sharedState.cacheManager.clearCache() loadedTables.clear() - sessionState.catalog.clearTempTables() - sessionState.catalog.tableRelationCache.invalidateAll() - + sessionState.catalog.reset() metadataHive.reset() - FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). -foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } - // HDFS root scratch dir requires the write all (733) permission. For each connecting user, // an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, with // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to
spark git commit: [SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the package of sql/core and sql/hive
Repository: spark Updated Branches: refs/heads/master b8733e0ad -> 0d00c768a [SPARK-20667][SQL][TESTS] Cleanup the cataloged metadata after completing the package of sql/core and sql/hive ## What changes were proposed in this pull request? So far, we do not drop all the cataloged objects after each package. Sometimes, we might hit strange test case errors because the previous test suite did not drop the cataloged/temporary objects (tables/functions/database). At least, we can first clean up the environment when completing the package of `sql/core` and `sql/hive`. ## How was this patch tested? N/A Author: Xiao LiCloses #17908 from gatorsmile/reset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d00c768 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d00c768 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d00c768 Branch: refs/heads/master Commit: 0d00c768a860fc03402c8f0c9081b8147c29133e Parents: b8733e0 Author: Xiao Li Authored: Tue May 9 20:10:50 2017 +0800 Committer: Wenchen Fan Committed: Tue May 9 20:10:50 2017 +0800 -- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala| 3 ++- .../scala/org/apache/spark/sql/test/SharedSQLContext.scala| 1 + .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 7 +-- 3 files changed, 4 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d00c768/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6c6d600..18e5146 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1251,9 +1251,10 @@ class SessionCatalog( dropTempFunction(func.funcName, ignoreIfNotExists = false) } } -tempTables.clear() +clearTempTables() globalTempViewManager.clear() functionRegistry.clear() +tableRelationCache.invalidateAll() // restore built-in functions FunctionRegistry.builtin.listFunction().foreach { f => val expressionInfo = FunctionRegistry.builtin.lookupFunction(f) http://git-wip-us.apache.org/repos/asf/spark/blob/0d00c768/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 81c69a3..7cea4c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -74,6 +74,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua protected override def afterAll(): Unit = { super.afterAll() if (_spark != null) { + _spark.sessionState.catalog.reset() _spark.stop() _spark = null } http://git-wip-us.apache.org/repos/asf/spark/blob/0d00c768/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index d9bb1f8..ee9ac21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -488,14 +488,9 @@ private[hive] class TestHiveSparkSession( sharedState.cacheManager.clearCache() loadedTables.clear() - sessionState.catalog.clearTempTables() - sessionState.catalog.tableRelationCache.invalidateAll() - + sessionState.catalog.reset() metadataHive.reset() - FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). -foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } - // HDFS root scratch dir requires the write all (733) permission. For each connecting user, // an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, with // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands,
spark git commit: [SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML
Repository: spark Updated Branches: refs/heads/branch-2.2 4bbfad44e -> 4b7aa0b1d [SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML ## What changes were proposed in this pull request? Remove ML methods we deprecated in 2.1. ## How was this patch tested? Existing tests. Author: Yanbo LiangCloses #17867 from yanboliang/spark-20606. (cherry picked from commit b8733e0ad9f5a700f385e210450fd2c10137293e) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b7aa0b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b7aa0b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b7aa0b1 Branch: refs/heads/branch-2.2 Commit: 4b7aa0b1dbd85e2238acba45e8f94c097358fb72 Parents: 4bbfad4 Author: Yanbo Liang Authored: Tue May 9 17:30:37 2017 +0800 Committer: Yanbo Liang Committed: Tue May 9 17:30:50 2017 +0800 -- .../classification/DecisionTreeClassifier.scala | 18 ++-- .../spark/ml/classification/GBTClassifier.scala | 24 ++--- .../classification/RandomForestClassifier.scala | 24 ++--- .../ml/regression/DecisionTreeRegressor.scala | 18 ++-- .../spark/ml/regression/GBTRegressor.scala | 24 ++--- .../ml/regression/RandomForestRegressor.scala | 24 ++--- .../org/apache/spark/ml/tree/treeParams.scala | 105 --- .../org/apache/spark/ml/util/ReadWrite.scala| 16 --- project/MimaExcludes.scala | 68 python/pyspark/ml/util.py | 32 -- 10 files changed, 134 insertions(+), 219 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b7aa0b1/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 9f60f08..5fb105c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -54,27 +54,27 @@ class DecisionTreeClassifier @Since("1.4.0") ( /** @group setParam */ @Since("1.4.0") - override def setMaxDepth(value: Int): this.type = set(maxDepth, value) + def setMaxDepth(value: Int): this.type = set(maxDepth, value) /** @group setParam */ @Since("1.4.0") - override def setMaxBins(value: Int): this.type = set(maxBins, value) + def setMaxBins(value: Int): this.type = set(maxBins, value) /** @group setParam */ @Since("1.4.0") - override def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) + def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) /** @group setParam */ @Since("1.4.0") - override def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) + def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) /** @group expertSetParam */ @Since("1.4.0") - override def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value) + def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value) /** @group expertSetParam */ @Since("1.4.0") - override def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value) + def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value) /** * Specifies how often to checkpoint the cached node IDs. @@ -86,15 +86,15 @@ class DecisionTreeClassifier @Since("1.4.0") ( * @group setParam */ @Since("1.4.0") - override def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) + def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) /** @group setParam */ @Since("1.4.0") - override def setImpurity(value: String): this.type = set(impurity, value) + def setImpurity(value: String): this.type = set(impurity, value) /** @group setParam */ @Since("1.6.0") - override def setSeed(value: Long): this.type = set(seed, value) + def setSeed(value: Long): this.type = set(seed, value) override protected def train(dataset: Dataset[_]): DecisionTreeClassificationModel = { val categoricalFeatures: Map[Int, Int] = http://git-wip-us.apache.org/repos/asf/spark/blob/4b7aa0b1/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
spark git commit: [SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML
Repository: spark Updated Branches: refs/heads/master be53a7835 -> b8733e0ad [SPARK-20606][ML] ML 2.2 QA: Remove deprecated methods for ML ## What changes were proposed in this pull request? Remove ML methods we deprecated in 2.1. ## How was this patch tested? Existing tests. Author: Yanbo LiangCloses #17867 from yanboliang/spark-20606. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8733e0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8733e0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8733e0a Branch: refs/heads/master Commit: b8733e0ad9f5a700f385e210450fd2c10137293e Parents: be53a78 Author: Yanbo Liang Authored: Tue May 9 17:30:37 2017 +0800 Committer: Yanbo Liang Committed: Tue May 9 17:30:37 2017 +0800 -- .../classification/DecisionTreeClassifier.scala | 18 ++-- .../spark/ml/classification/GBTClassifier.scala | 24 ++--- .../classification/RandomForestClassifier.scala | 24 ++--- .../ml/regression/DecisionTreeRegressor.scala | 18 ++-- .../spark/ml/regression/GBTRegressor.scala | 24 ++--- .../ml/regression/RandomForestRegressor.scala | 24 ++--- .../org/apache/spark/ml/tree/treeParams.scala | 105 --- .../org/apache/spark/ml/util/ReadWrite.scala| 16 --- project/MimaExcludes.scala | 68 python/pyspark/ml/util.py | 32 -- 10 files changed, 134 insertions(+), 219 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b8733e0a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 9f60f08..5fb105c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -54,27 +54,27 @@ class DecisionTreeClassifier @Since("1.4.0") ( /** @group setParam */ @Since("1.4.0") - override def setMaxDepth(value: Int): this.type = set(maxDepth, value) + def setMaxDepth(value: Int): this.type = set(maxDepth, value) /** @group setParam */ @Since("1.4.0") - override def setMaxBins(value: Int): this.type = set(maxBins, value) + def setMaxBins(value: Int): this.type = set(maxBins, value) /** @group setParam */ @Since("1.4.0") - override def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) + def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) /** @group setParam */ @Since("1.4.0") - override def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) + def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) /** @group expertSetParam */ @Since("1.4.0") - override def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value) + def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value) /** @group expertSetParam */ @Since("1.4.0") - override def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value) + def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value) /** * Specifies how often to checkpoint the cached node IDs. @@ -86,15 +86,15 @@ class DecisionTreeClassifier @Since("1.4.0") ( * @group setParam */ @Since("1.4.0") - override def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) + def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) /** @group setParam */ @Since("1.4.0") - override def setImpurity(value: String): this.type = set(impurity, value) + def setImpurity(value: String): this.type = set(impurity, value) /** @group setParam */ @Since("1.6.0") - override def setSeed(value: Long): this.type = set(seed, value) + def setSeed(value: Long): this.type = set(seed, value) override protected def train(dataset: Dataset[_]): DecisionTreeClassificationModel = { val categoricalFeatures: Map[Int, Int] = http://git-wip-us.apache.org/repos/asf/spark/blob/b8733e0a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala index ade0960..263ed10 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala +++
spark-website git commit: Direct 2.1.0, 2.0.1 downloads to archive; use https links for download; Apache Hadoop; remove stale download logic
Repository: spark-website Updated Branches: refs/heads/asf-site 7b32b181f -> b54c4f3fa Direct 2.1.0, 2.0.1 downloads to archive; use https links for download; Apache Hadoop; remove stale download logic Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/b54c4f3f Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/b54c4f3f Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/b54c4f3f Branch: refs/heads/asf-site Commit: b54c4f3faf837a3e772af989eb7df0b64698e557 Parents: 7b32b18 Author: Sean OwenAuthored: Tue May 9 10:09:21 2017 +0100 Committer: Sean Owen Committed: Tue May 9 10:09:21 2017 +0100 -- js/downloads.js | 41 - 1 file changed, 16 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/b54c4f3f/js/downloads.js -- diff --git a/js/downloads.js b/js/downloads.js index 81dcbfc..d308389 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -8,14 +8,14 @@ function addRelease(version, releaseDate, packages, stable) { } var sources = {pretty: "Source Code", tag: "sources"}; -var hadoopFree = {pretty: "Pre-build with user-provided Hadoop [can use with most Hadoop distributions]", tag: "without-hadoop"}; -var hadoop1 = {pretty: "Pre-built for Hadoop 1.X", tag: "hadoop1"}; +var hadoopFree = {pretty: "Pre-build with user-provided Apache Hadoop", tag: "without-hadoop"}; +var hadoop1 = {pretty: "Pre-built for Apache Hadoop 1.X", tag: "hadoop1"}; var cdh4 = {pretty: "Pre-built for CDH 4", tag: "cdh4"}; -var hadoop2 = {pretty: "Pre-built for Hadoop 2.2", tag: "hadoop2"}; -var hadoop2p3 = {pretty: "Pre-built for Hadoop 2.3", tag: "hadoop2.3"}; -var hadoop2p4 = {pretty: "Pre-built for Hadoop 2.4", tag: "hadoop2.4"}; -var hadoop2p6 = {pretty: "Pre-built for Hadoop 2.6", tag: "hadoop2.6"}; -var hadoop2p7 = {pretty: "Pre-built for Hadoop 2.7 and later", tag: "hadoop2.7"}; +var hadoop2 = {pretty: "Pre-built for Apache Hadoop 2.2", tag: "hadoop2"}; +var hadoop2p3 = {pretty: "Pre-built for Apache Hadoop 2.3", tag: "hadoop2.3"}; +var hadoop2p4 = {pretty: "Pre-built for Apache Hadoop 2.4", tag: "hadoop2.4"}; +var hadoop2p6 = {pretty: "Pre-built for Apache Hadoop 2.6", tag: "hadoop2.6"}; +var hadoop2p7 = {pretty: "Pre-built for Apache Hadoop 2.7 and later", tag: "hadoop2.7"}; // 1.4.0+ var packagesV6 = [hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, hadoop1, cdh4, sources]; @@ -135,7 +135,7 @@ function onVersionSelect() { append(packageSelect, option); } - var href = "http://www.apache.org/dist/spark/spark-; + version + "/"; + var href = "https://www.apache.org/dist/spark/spark-; + version + "/"; var link = "" + versionShort(version) + " signatures and checksums"; append(verifyLink, link); @@ -152,13 +152,8 @@ function onPackageSelect() { var pkg = getSelectedValue(packageSelect); - //if (pkg.toLowerCase().indexOf("mapr") > -1) { - // var external = "External Download (MAY CONTAIN INCOMPATIBLE LICENSES)"; - // append(downloadSelect, "" + external + ""); - //} else { -append(downloadSelect, "Direct Download"); -append(downloadSelect, "Select Apache Mirror"); - //} + append(downloadSelect, "Direct Download"); + append(downloadSelect, "Select Apache Mirror"); updateDownloadLink(); } @@ -184,18 +179,14 @@ function updateDownloadLink() { .replace(/\$pkg/g, pkg) .replace(/-bin-sources/, ""); // special case for source packages - var link = "http://d3kbcqa49mib13.cloudfront.net/$artifact;; - if (version < "0.8.0") { -link = "http://spark-project.org/download/$artifact;; - } - if (pkg.toLowerCase().indexOf("mapr") > -1) { -link = "http://package.mapr.com/tools/apache-spark/$ver/$artifact; - } else if (download == "apache") { + var link = "https://d3kbcqa49mib13.cloudfront.net/$artifact;; + if (download == "apache") { if (version < "1.6.3" || -(version >= "2.0.0" && version < "2.0.1")) { - link = "http://archive.apache.org/dist/spark/spark-$ver/$artifact;; +(version >= "2.0.0" && version <= "2.0.1") || +(version >= "2.1.0" && version <= "2.1.0")) { + link = "https://archive.apache.org/dist/spark/spark-$ver/$artifact;; } else { - link = "http://www.apache.org/dyn/closer.lua/spark/spark-$ver/$artifact;; + link = "https://www.apache.org/dyn/closer.lua/spark/spark-$ver/$artifact;; } } link = link - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException
Repository: spark Updated Branches: refs/heads/branch-2.2 ca3f7edba -> 4bbfad44e [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException ## What changes were proposed in this pull request? Added a check for for the number of defined values. Previously the argmax function assumed that at least one value was defined if the vector size was greater than zero. ## How was this patch tested? Tests were added to the existing VectorsSuite to cover this case. Author: Jon McLeanCloses #17877 from jonmclean/vectorArgmaxIndexBug. (cherry picked from commit be53a78352ae7c70d8a07d0df24574b3e3129b4a) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bbfad44 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bbfad44 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bbfad44 Branch: refs/heads/branch-2.2 Commit: 4bbfad44e426365ad9f4941d68c110523b17ea6d Parents: ca3f7ed Author: Jon McLean Authored: Tue May 9 09:47:50 2017 +0100 Committer: Sean Owen Committed: Tue May 9 09:47:58 2017 +0100 -- .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 ++ .../test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala | 7 +++ .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 ++ .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++ 4 files changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala -- diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala index 8e166ba..3fbc095 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala @@ -657,6 +657,8 @@ class SparseVector @Since("2.0.0") ( override def argmax: Int = { if (size == 0) { -1 +} else if (numActives == 0) { + 0 } else { // Find the max active entry. var maxIdx = indices(0) http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala -- diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala index dfbdaf1..4cd91af 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala @@ -125,6 +125,13 @@ class VectorsSuite extends SparkMLFunSuite { val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0)) assert(vec8.argmax === 0) + +// Check for case when sparse vector is non-empty but the values are empty +val vec9 = Vectors.sparse(100, Array.empty[Int], Array.empty[Double]).asInstanceOf[SparseVector] +assert(vec9.argmax === 0) + +val vec10 = Vectors.sparse(1, Array.empty[Int], Array.empty[Double]).asInstanceOf[SparseVector] +assert(vec10.argmax === 0) } test("vector equals") { http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 723addc..f063420 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -846,6 +846,8 @@ class SparseVector @Since("1.0.0") ( override def argmax: Int = { if (size == 0) { -1 +} else if (numActives == 0) { + 0 } else { // Find the max active entry. var maxIdx = indices(0) http://git-wip-us.apache.org/repos/asf/spark/blob/4bbfad44/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 71a3cea..6172cff 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -122,6 +122,13 @@ class VectorsSuite extends SparkFunSuite with Logging { val vec8 = Vectors.sparse(5, Array(1, 2),
spark git commit: [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException
Repository: spark Updated Branches: refs/heads/branch-2.1 a1112c615 -> f7a91a17e [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException ## What changes were proposed in this pull request? Added a check for for the number of defined values. Previously the argmax function assumed that at least one value was defined if the vector size was greater than zero. ## How was this patch tested? Tests were added to the existing VectorsSuite to cover this case. Author: Jon McLeanCloses #17877 from jonmclean/vectorArgmaxIndexBug. (cherry picked from commit be53a78352ae7c70d8a07d0df24574b3e3129b4a) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7a91a17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7a91a17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7a91a17 Branch: refs/heads/branch-2.1 Commit: f7a91a17e8e20965b3e634e611690a96f72cec6b Parents: a1112c6 Author: Jon McLean Authored: Tue May 9 09:47:50 2017 +0100 Committer: Sean Owen Committed: Tue May 9 09:48:09 2017 +0100 -- .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 ++ .../test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala | 7 +++ .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 ++ .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++ 4 files changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala -- diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala index 22e4ec6..7bc2cb1 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala @@ -657,6 +657,8 @@ class SparseVector @Since("2.0.0") ( override def argmax: Int = { if (size == 0) { -1 +} else if (numActives == 0) { + 0 } else { // Find the max active entry. var maxIdx = indices(0) http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala -- diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala index ea22c27..bd71656 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala @@ -125,6 +125,13 @@ class VectorsSuite extends SparkMLFunSuite { val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0)) assert(vec8.argmax === 0) + +// Check for case when sparse vector is non-empty but the values are empty +val vec9 = Vectors.sparse(100, Array.empty[Int], Array.empty[Double]).asInstanceOf[SparseVector] +assert(vec9.argmax === 0) + +val vec10 = Vectors.sparse(1, Array.empty[Int], Array.empty[Double]).asInstanceOf[SparseVector] +assert(vec10.argmax === 0) } test("vector equals") { http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 63ea9d3..5282849 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -846,6 +846,8 @@ class SparseVector @Since("1.0.0") ( override def argmax: Int = { if (size == 0) { -1 +} else if (numActives == 0) { + 0 } else { // Find the max active entry. var maxIdx = indices(0) http://git-wip-us.apache.org/repos/asf/spark/blob/f7a91a17/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 71a3cea..6172cff 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -122,6 +122,13 @@ class VectorsSuite extends SparkFunSuite with Logging { val vec8 = Vectors.sparse(5, Array(1, 2),
spark git commit: [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException
Repository: spark Updated Branches: refs/heads/master 10b00abad -> be53a7835 [SPARK-20615][ML][TEST] SparseVector.argmax throws IndexOutOfBoundsException ## What changes were proposed in this pull request? Added a check for for the number of defined values. Previously the argmax function assumed that at least one value was defined if the vector size was greater than zero. ## How was this patch tested? Tests were added to the existing VectorsSuite to cover this case. Author: Jon McLeanCloses #17877 from jonmclean/vectorArgmaxIndexBug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be53a783 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be53a783 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be53a783 Branch: refs/heads/master Commit: be53a78352ae7c70d8a07d0df24574b3e3129b4a Parents: 10b00ab Author: Jon McLean Authored: Tue May 9 09:47:50 2017 +0100 Committer: Sean Owen Committed: Tue May 9 09:47:50 2017 +0100 -- .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 ++ .../test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala | 7 +++ .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 ++ .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++ 4 files changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala -- diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala index 8e166ba..3fbc095 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala @@ -657,6 +657,8 @@ class SparseVector @Since("2.0.0") ( override def argmax: Int = { if (size == 0) { -1 +} else if (numActives == 0) { + 0 } else { // Find the max active entry. var maxIdx = indices(0) http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala -- diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala index dfbdaf1..4cd91af 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/VectorsSuite.scala @@ -125,6 +125,13 @@ class VectorsSuite extends SparkMLFunSuite { val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0)) assert(vec8.argmax === 0) + +// Check for case when sparse vector is non-empty but the values are empty +val vec9 = Vectors.sparse(100, Array.empty[Int], Array.empty[Double]).asInstanceOf[SparseVector] +assert(vec9.argmax === 0) + +val vec10 = Vectors.sparse(1, Array.empty[Int], Array.empty[Double]).asInstanceOf[SparseVector] +assert(vec10.argmax === 0) } test("vector equals") { http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 723addc..f063420 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -846,6 +846,8 @@ class SparseVector @Since("1.0.0") ( override def argmax: Int = { if (size == 0) { -1 +} else if (numActives == 0) { + 0 } else { // Find the max active entry. var maxIdx = indices(0) http://git-wip-us.apache.org/repos/asf/spark/blob/be53a783/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 71a3cea..6172cff 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -122,6 +122,13 @@ class VectorsSuite extends SparkFunSuite with Logging { val vec8 = Vectors.sparse(5, Array(1, 2), Array(0.0, -1.0)) assert(vec8.argmax === 0) + +// Check for case when sparse vector is non-empty but the values are
spark git commit: [SPARK-20587][ML] Improve performance of ML ALS recommendForAll
Repository: spark Updated Branches: refs/heads/branch-2.2 72fca9a0a -> ca3f7edba [SPARK-20587][ML] Improve performance of ML ALS recommendForAll This PR is a `DataFrame` version of #17742 for [SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving the performance of `recommendAll` methods. ## How was this patch tested? Existing unit tests. Author: Nick PentreathCloses #17845 from MLnick/ml-als-perf. (cherry picked from commit 10b00abadf4a3473332eef996db7b66f491316f2) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca3f7edb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca3f7edb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca3f7edb Branch: refs/heads/branch-2.2 Commit: ca3f7edbad6a2e7fcd1c1d3dbd1a522cd0d7c476 Parents: 72fca9a Author: Nick Pentreath Authored: Tue May 9 10:13:15 2017 +0200 Committer: Nick Pentreath Committed: Tue May 9 10:13:36 2017 +0200 -- .../apache/spark/ml/recommendation/ALS.scala| 71 ++-- 1 file changed, 64 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca3f7edb/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index a20ef72..4a130e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter} import org.apache.spark.util.random.XORShiftRandom @@ -356,6 +356,19 @@ class ALSModel private[ml] ( /** * Makes recommendations for all users (or items). + * + * Note: the previous approach used for computing top-k recommendations + * used a cross-join followed by predicting a score for each row of the joined dataset. + * However, this results in exploding the size of intermediate data. While Spark SQL makes it + * relatively efficient, the approach implemented here is significantly more efficient. + * + * This approach groups factors into blocks and computes the top-k elements per block, + * using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]]. + * It then computes the global top-k by aggregating the per block top-k elements with + * a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data. + * This is the DataFrame equivalent to the approach used in + * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]]. + * * @param srcFactors src factors for which to generate recommendations * @param dstFactors dst factors used to make recommendations * @param srcOutputColumn name of the column for the source ID in the output DataFrame @@ -372,11 +385,43 @@ class ALSModel private[ml] ( num: Int): DataFrame = { import srcFactors.sparkSession.implicits._ -val ratings = srcFactors.crossJoin(dstFactors) - .select( -srcFactors("id"), -dstFactors("id"), -predict(srcFactors("features"), dstFactors("features"))) +val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) +val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) +val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) + .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] + .flatMap { case (srcIter, dstIter) => +val m = srcIter.size +val n = math.min(dstIter.size, num) +val output = new Array[(Int, Int, Float)](m * n) +var j = 0 +val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) +srcIter.foreach { case (srcId, srcFactor) => + dstIter.foreach { case (dstId, dstFactor) => +/* + * The below code is equivalent to + *`val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)` + * This handwritten version is as or more efficient as BLAS calls in this case. + */ +var score = 0.0f +var k = 0 +while (k < rank) { + score += srcFactor(k) * dstFactor(k) + k += 1 +} +pq += dstId
spark git commit: [SPARK-20587][ML] Improve performance of ML ALS recommendForAll
Repository: spark Updated Branches: refs/heads/master 807942476 -> 10b00abad [SPARK-20587][ML] Improve performance of ML ALS recommendForAll This PR is a `DataFrame` version of #17742 for [SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving the performance of `recommendAll` methods. ## How was this patch tested? Existing unit tests. Author: Nick PentreathCloses #17845 from MLnick/ml-als-perf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10b00aba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10b00aba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10b00aba Branch: refs/heads/master Commit: 10b00abadf4a3473332eef996db7b66f491316f2 Parents: 8079424 Author: Nick Pentreath Authored: Tue May 9 10:13:15 2017 +0200 Committer: Nick Pentreath Committed: Tue May 9 10:13:15 2017 +0200 -- .../apache/spark/ml/recommendation/ALS.scala| 71 ++-- 1 file changed, 64 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/10b00aba/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 1562bf1..d626f04 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter} import org.apache.spark.util.random.XORShiftRandom @@ -356,6 +356,19 @@ class ALSModel private[ml] ( /** * Makes recommendations for all users (or items). + * + * Note: the previous approach used for computing top-k recommendations + * used a cross-join followed by predicting a score for each row of the joined dataset. + * However, this results in exploding the size of intermediate data. While Spark SQL makes it + * relatively efficient, the approach implemented here is significantly more efficient. + * + * This approach groups factors into blocks and computes the top-k elements per block, + * using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]]. + * It then computes the global top-k by aggregating the per block top-k elements with + * a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data. + * This is the DataFrame equivalent to the approach used in + * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]]. + * * @param srcFactors src factors for which to generate recommendations * @param dstFactors dst factors used to make recommendations * @param srcOutputColumn name of the column for the source ID in the output DataFrame @@ -372,11 +385,43 @@ class ALSModel private[ml] ( num: Int): DataFrame = { import srcFactors.sparkSession.implicits._ -val ratings = srcFactors.crossJoin(dstFactors) - .select( -srcFactors("id"), -dstFactors("id"), -predict(srcFactors("features"), dstFactors("features"))) +val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) +val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) +val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) + .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] + .flatMap { case (srcIter, dstIter) => +val m = srcIter.size +val n = math.min(dstIter.size, num) +val output = new Array[(Int, Int, Float)](m * n) +var j = 0 +val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) +srcIter.foreach { case (srcId, srcFactor) => + dstIter.foreach { case (dstId, dstFactor) => +/* + * The below code is equivalent to + *`val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)` + * This handwritten version is as or more efficient as BLAS calls in this case. + */ +var score = 0.0f +var k = 0 +while (k < rank) { + score += srcFactor(k) * dstFactor(k) + k += 1 +} +pq += dstId -> score + } + val pqIter = pq.iterator + var i = 0 + while (i < n) { +val
spark git commit: [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll
Repository: spark Updated Branches: refs/heads/master b952b44af -> 807942476 [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll The recommendForAll of MLLIB ALS is very slow. GC is a key problem of the current method. The task use the following code to keep temp result: val output = new Array[(Int, (Int, Double))](m*n) m = n = 4096 (default value, no method to set) so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and cause serious GC problem, and it is frequently OOM. Actually, we don't need to save all the temp result. Support we recommend topK (topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 4 + 8) memory to save the temp result. The Test Environment: 3 workers: each work 10 core, each work 30G memory, each work 1 executor. The Data: User 480,000, and Item 17,000 BlockSize: 1024 2048 4096 8192 Old method: 245s 332s 488s OOM This solution: 121s 118s 117s 120s The existing UT. Author: PengAuthor: Peng Meng Closes #17742 from mpjlu/OptimizeAls. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80794247 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80794247 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80794247 Branch: refs/heads/master Commit: 8079424763c2043264f30a6898ce964379bd9b56 Parents: b952b44 Author: Peng Authored: Tue May 9 10:05:49 2017 +0200 Committer: Nick Pentreath Committed: Tue May 9 10:06:48 2017 +0200 -- .../MatrixFactorizationModel.scala | 81 1 file changed, 50 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80794247/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 23045fa..d45866c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.BoundedPriorityQueue /** * Model representing the result of matrix factorization. @@ -274,46 +275,64 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(rank, srcFeatures) -val dstBlocks = blockify(rank, dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { - case ((srcIds, srcFactors), (dstIds, dstFactors)) => -val m = srcIds.length -val n = dstIds.length -val ratings = srcFactors.transpose.multiply(dstFactors) -val output = new Array[(Int, (Int, Double))](m * n) -var k = 0 -ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) - k += 1 +val srcBlocks = blockify(srcFeatures) +val dstBlocks = blockify(dstFeatures) +/** + * The previous approach used for computing top-k recommendations aimed to group + * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could + * be used for efficiency. However, this causes excessive GC pressure due to the large + * arrays required for intermediate result storage, as well as a high sensitivity to the + * block size used. + * The following approach still groups factors into blocks, but instead computes the + * top-k elements per block, using a simple dot product (instead of gemm) and an efficient + * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results + * in significantly reduced GC pressure as well as shuffle data, which far outweighs + * any cost incurred from not using Level 3 BLAS operations. + */ +val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => + val m = srcIter.size + val n = math.min(dstIter.size, num) + val output = new Array[(Int, (Int, Double))](m * n) + var j = 0 + val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) + srcIter.foreach { case (srcId, srcFactor) => +dstIter.foreach { case (dstId, dstFactor) => + /* +
spark git commit: [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll
Repository: spark Updated Branches: refs/heads/branch-2.2 54e074349 -> 72fca9a0a [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll The recommendForAll of MLLIB ALS is very slow. GC is a key problem of the current method. The task use the following code to keep temp result: val output = new Array[(Int, (Int, Double))](m*n) m = n = 4096 (default value, no method to set) so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and cause serious GC problem, and it is frequently OOM. Actually, we don't need to save all the temp result. Support we recommend topK (topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 4 + 8) memory to save the temp result. The Test Environment: 3 workers: each work 10 core, each work 30G memory, each work 1 executor. The Data: User 480,000, and Item 17,000 BlockSize: 1024 2048 4096 8192 Old method: 245s 332s 488s OOM This solution: 121s 118s 117s 120s The existing UT. Author: PengAuthor: Peng Meng Closes #17742 from mpjlu/OptimizeAls. (cherry picked from commit 8079424763c2043264f30a6898ce964379bd9b56) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72fca9a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72fca9a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72fca9a0 Branch: refs/heads/branch-2.2 Commit: 72fca9a0a7a6dd2ab7c338fab9666b51cd981cce Parents: 54e0743 Author: Peng Authored: Tue May 9 10:05:49 2017 +0200 Committer: Nick Pentreath Committed: Tue May 9 10:08:23 2017 +0200 -- .../MatrixFactorizationModel.scala | 81 1 file changed, 50 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72fca9a0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 23045fa..d45866c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.BoundedPriorityQueue /** * Model representing the result of matrix factorization. @@ -274,46 +275,64 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(rank, srcFeatures) -val dstBlocks = blockify(rank, dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { - case ((srcIds, srcFactors), (dstIds, dstFactors)) => -val m = srcIds.length -val n = dstIds.length -val ratings = srcFactors.transpose.multiply(dstFactors) -val output = new Array[(Int, (Int, Double))](m * n) -var k = 0 -ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) - k += 1 +val srcBlocks = blockify(srcFeatures) +val dstBlocks = blockify(dstFeatures) +/** + * The previous approach used for computing top-k recommendations aimed to group + * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could + * be used for efficiency. However, this causes excessive GC pressure due to the large + * arrays required for intermediate result storage, as well as a high sensitivity to the + * block size used. + * The following approach still groups factors into blocks, but instead computes the + * top-k elements per block, using a simple dot product (instead of gemm) and an efficient + * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results + * in significantly reduced GC pressure as well as shuffle data, which far outweighs + * any cost incurred from not using Level 3 BLAS operations. + */ +val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => + val m = srcIter.size + val n = math.min(dstIter.size, num) + val output = new Array[(Int, (Int, Double))](m * n) + var j = 0 + val pq = new BoundedPriorityQueue[(Int,