spark git commit: [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates
Repository: spark Updated Branches: refs/heads/master 7dfad4b13 -> 37eb9184f [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates ## What changes were proposed in this pull request? This patch fixes a minor correctness issue impacting the pushdown of filters beneath aggregates. Specifically, if a filter condition references no grouping or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed beneath an aggregate. Intuitively, the only case where you can push a filter beneath an aggregate is when that filter is deterministic and is defined over the grouping columns / expressions, since in that case the filter is acting to exclude entire groups from the query (like a `HAVING` clause). The existing code would only push deterministic filters beneath aggregates when all of the filter's references were grouping columns, but this logic missed the case where a filter has no references. For example, `WHERE false` is deterministic but is independent of the actual data. This patch fixes this minor bug by adding a new check to ensure that we don't push filters beneath aggregates when those filters don't reference any columns. ## How was this patch tested? New regression test in FilterPushdownSuite. Author: Josh RosenCloses #15289 from JoshRosen/SPARK-17712. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37eb9184 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37eb9184 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37eb9184 Branch: refs/heads/master Commit: 37eb9184f1e9f1c07142c66936671f4711ef407d Parents: 7dfad4b Author: Josh Rosen Authored: Wed Sep 28 19:03:05 2016 -0700 Committer: Herman van Hovell Committed: Wed Sep 28 19:03:05 2016 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/FilterPushdownSuite.scala | 17 + 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/37eb9184/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0df16b7..4952ba3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -710,7 +710,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) -replaced.references.subsetOf(aggregate.child.outputSet) +cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet) } val stayUp = rest ++ containingNonDeterministic http://git-wip-us.apache.org/repos/asf/spark/blob/37eb9184/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 55836f9..019f132 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -687,6 +687,23 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("SPARK-17712: aggregate: don't push down filters that are data-independent") { +val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty) + .select('a, 'b) + .groupBy('a)(count('a)) + .where(false) + +val optimized = Optimize.execute(originalQuery.analyze) + +val correctAnswer = testRelation + .select('a, 'b) + .groupBy('a)(count('a)) + .where(false) + .analyze + +comparePlans(optimized, correctAnswer) + } + test("broadcast hint") { val originalQuery = BroadcastHint(testRelation) .where('a === 2L && 'b + Rand(10).as("rnd") === 3) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17710][HOTFIX] Fix ClassCircularityError in ReplSuite tests in Maven build: use 'Class.forName' instead of 'Utils.classForName'
Repository: spark Updated Branches: refs/heads/master 7d0923202 -> 7dfad4b13 [SPARK-17710][HOTFIX] Fix ClassCircularityError in ReplSuite tests in Maven build: use 'Class.forName' instead of 'Utils.classForName' ## What changes were proposed in this pull request? Fix ClassCircularityError in ReplSuite tests when Spark is built by Maven build. ## How was this patch tested? (1) ``` build/mvn -DskipTests -Phadoop-2.3 -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl -Pmesos clean package ``` Then test: ``` build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.repl.ReplSuite test ``` ReplSuite tests passed (2) Manual Tests against some Spark applications in Yarn client mode and Yarn cluster mode. Need to check if spark caller contexts are written into HDFS hdfs-audit.log and Yarn RM audit log successfully. Author: Weiqing YangCloses #15286 from Sherry302/SPARK-16757. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7dfad4b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7dfad4b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7dfad4b1 Branch: refs/heads/master Commit: 7dfad4b132bc46263ef788ced4a935862f5c8756 Parents: 7d09232 Author: Weiqing Yang Authored: Wed Sep 28 20:20:03 2016 -0500 Committer: Tom Graves Committed: Wed Sep 28 20:20:03 2016 -0500 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7dfad4b1/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index caa768c..f3493bd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2489,8 +2489,10 @@ private[spark] class CallerContext( def setCurrentContext(): Boolean = { var succeed = false try { - val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") - val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") + // scalastyle:off classforname + val callerContext = Class.forName("org.apache.hadoop.ipc.CallerContext") + val Builder = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder") + // scalastyle:on classforname val builderInst = Builder.getConstructor(classOf[String]).newInstance(context) val hdfsContext = Builder.getMethod("build").invoke(builderInst) callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v2.0.1-rc4
Repository: spark Updated Branches: refs/heads/branch-2.0 0a69477a1 -> 7d612a7d5 Preparing Spark release v2.0.1-rc4 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/933d2c1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/933d2c1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/933d2c1e Branch: refs/heads/branch-2.0 Commit: 933d2c1ea4e5f5c4ec8d375b5ccaa4577ba4be38 Parents: 0a69477 Author: Patrick WendellAuthored: Wed Sep 28 16:27:45 2016 -0700 Committer: Patrick Wendell Committed: Wed Sep 28 16:27:45 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 36 files changed, 37 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index dfb7e22..3e49eac 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.0.2 +Version: 2.0.1 Date: 2016-08-27 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index ca6daa2..6db3a59 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index c727f54..269b845 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index e335a89..20cf29e 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 8e64f56..25cc328 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml
[2/2] spark git commit: Preparing development version 2.0.2-SNAPSHOT
Preparing development version 2.0.2-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d612a7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d612a7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d612a7d Branch: refs/heads/branch-2.0 Commit: 7d612a7d5277183d3bee3882a687c76dc8ea0e9a Parents: 933d2c1 Author: Patrick WendellAuthored: Wed Sep 28 16:27:54 2016 -0700 Committer: Patrick Wendell Committed: Wed Sep 28 16:27:54 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 36 files changed, 37 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 3e49eac..dfb7e22 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.0.1 +Version: 2.0.2 Date: 2016-08-27 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 6db3a59..ca6daa2 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 269b845..c727f54 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 20cf29e..e335a89 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 25cc328..8e64f56 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/sketch/pom.xml
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.0.1-rc4 [created] 933d2c1ea - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.
Repository: spark Updated Branches: refs/heads/branch-2.0 d358298f1 -> 0a69477a1 [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values. ## What changes were proposed in this pull request? We added native versions of `collect_set` and `collect_list` in Spark 2.0. These currently also (try to) collect null values, this is different from the original Hive implementation. This PR fixes this by adding a null check to the `Collect.update` method. ## How was this patch tested? Added a regression test to `DataFrameAggregateSuite`. Author: Herman van HovellCloses #15208 from hvanhovell/SPARK-17641. (cherry picked from commit 7d09232028967978d9db314ec041a762599f636b) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a69477a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a69477a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a69477a Branch: refs/heads/branch-2.0 Commit: 0a69477a10adb3969a20ae870436299ef5152788 Parents: d358298 Author: Herman van Hovell Authored: Wed Sep 28 16:25:10 2016 -0700 Committer: Reynold Xin Committed: Wed Sep 28 16:25:31 2016 -0700 -- .../sql/catalyst/expressions/aggregate/collect.scala| 7 ++- .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 12 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a69477a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 896ff61..78a388d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -65,7 +65,12 @@ abstract class Collect extends ImperativeAggregate { } override def update(b: MutableRow, input: InternalRow): Unit = { -buffer += child.eval(input) +// Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here. +// See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator +val value = child.eval(input) +if (value != null) { + buffer += value +} } override def merge(buffer: MutableRow, input: InternalRow): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/0a69477a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index cb505ac..3454caf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -477,6 +477,18 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { assert(error.message.contains("collect_set() cannot have map type data")) } + test("SPARK-17641: collect functions should not collect null values") { +val df = Seq(("1", 2), (null, 2), ("1", 4)).toDF("a", "b") +checkAnswer( + df.select(collect_list($"a"), collect_list($"b")), + Seq(Row(Seq("1", "1"), Seq(2, 2, 4))) +) +checkAnswer( + df.select(collect_set($"a"), collect_set($"b")), + Seq(Row(Seq("1"), Seq(2, 4))) +) + } + test("SPARK-14664: Decimal sum/avg over window should work.") { checkAnswer( spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.
Repository: spark Updated Branches: refs/heads/master 557d6e322 -> 7d0923202 [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values. ## What changes were proposed in this pull request? We added native versions of `collect_set` and `collect_list` in Spark 2.0. These currently also (try to) collect null values, this is different from the original Hive implementation. This PR fixes this by adding a null check to the `Collect.update` method. ## How was this patch tested? Added a regression test to `DataFrameAggregateSuite`. Author: Herman van HovellCloses #15208 from hvanhovell/SPARK-17641. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d092320 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d092320 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d092320 Branch: refs/heads/master Commit: 7d09232028967978d9db314ec041a762599f636b Parents: 557d6e3 Author: Herman van Hovell Authored: Wed Sep 28 16:25:10 2016 -0700 Committer: Reynold Xin Committed: Wed Sep 28 16:25:10 2016 -0700 -- .../sql/catalyst/expressions/aggregate/collect.scala| 7 ++- .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 12 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d092320/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 896ff61..78a388d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -65,7 +65,12 @@ abstract class Collect extends ImperativeAggregate { } override def update(b: MutableRow, input: InternalRow): Unit = { -buffer += child.eval(input) +// Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here. +// See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator +val value = child.eval(input) +if (value != null) { + buffer += value +} } override def merge(buffer: MutableRow, input: InternalRow): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/7d092320/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 0e172be..7aa4f00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -477,6 +477,18 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { assert(error.message.contains("collect_set() cannot have map type data")) } + test("SPARK-17641: collect functions should not collect null values") { +val df = Seq(("1", 2), (null, 2), ("1", 4)).toDF("a", "b") +checkAnswer( + df.select(collect_list($"a"), collect_list($"b")), + Seq(Row(Seq("1", "1"), Seq(2, 2, 4))) +) +checkAnswer( + df.select(collect_set($"a"), collect_set($"b")), + Seq(Row(Seq("1"), Seq(2, 4))) +) + } + test("SPARK-14664: Decimal sum/avg over window should work.") { checkAnswer( spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17713][SQL] Move row-datasource related tests out of JDBCSuite
Repository: spark Updated Branches: refs/heads/master a6cfa3f38 -> 557d6e322 [SPARK-17713][SQL] Move row-datasource related tests out of JDBCSuite ## What changes were proposed in this pull request? As a followup for https://github.com/apache/spark/pull/15273 we should move non-JDBC specific tests out of that suite. ## How was this patch tested? Ran the test. Author: Eric LiangCloses #15287 from ericl/spark-17713. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/557d6e32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/557d6e32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/557d6e32 Branch: refs/heads/master Commit: 557d6e32272dee4eaa0f426cc3e2f82ea361c3da Parents: a6cfa3f Author: Eric Liang Authored: Wed Sep 28 16:20:49 2016 -0700 Committer: Reynold Xin Committed: Wed Sep 28 16:20:49 2016 -0700 -- .../RowDataSourceStrategySuite.scala| 72 .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 --- 2 files changed, 72 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/557d6e32/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala new file mode 100644 index 000..d9afa46 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.sql.DriverManager +import java.util.Properties + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +class RowDataSourceStrategySuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { + import testImplicits._ + + val url = "jdbc:h2:mem:testdb0" + val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass" + var conn: java.sql.Connection = null + + before { +Utils.classForName("org.h2.Driver") +// Extra properties that will be specified for our database. We need these to test +// usage of parameters from OPTIONS clause in queries. +val properties = new Properties() +properties.setProperty("user", "testUser") +properties.setProperty("password", "testPass") +properties.setProperty("rowId", "false") + +conn = DriverManager.getConnection(url, properties) +conn.prepareStatement("create schema test").executeUpdate() +conn.prepareStatement("create table test.inttypes (a INT, b INT, c INT)").executeUpdate() +conn.prepareStatement("insert into test.inttypes values (1, 2, 3)").executeUpdate() +conn.commit() +sql( + s""" +|CREATE TEMPORARY TABLE inttypes +|USING org.apache.spark.sql.jdbc +|OPTIONS (url '$url', dbtable 'TEST.INTTYPES', user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + } + + after { +conn.close() + } + + test("SPARK-17673: Exchange reuse respects differences in output schema") { +val df = sql("SELECT * FROM inttypes") +val df1 = df.groupBy("a").agg("b" -> "min") +val df2 = df.groupBy("a").agg("c" -> "min") +val res = df1.union(df2) +assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/557d6e32/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport)
Repository: spark Updated Branches: refs/heads/branch-2.0 4c694e452 -> d358298f1 [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport) This backports https://github.com/apache/spark/pull/15273 to branch-2.0 Also verified the test passes after the patch was applied. rxin Author: Eric LiangCloses #15282 from ericl/spark-17673-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d358298f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d358298f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d358298f Branch: refs/heads/branch-2.0 Commit: d358298f1082edd31489a1b08f428c8e60278d69 Parents: 4c694e4 Author: Eric Liang Authored: Wed Sep 28 16:19:06 2016 -0700 Committer: Reynold Xin Committed: Wed Sep 28 16:19:06 2016 -0700 -- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 - .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 2 files changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6b4b3b8..2779694 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -347,13 +347,16 @@ object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) +// These metadata values make scan plans uniquely identifiable for equality checking. +// TODO(SPARK-17701) using strings for equality checking is brittle val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)] if (pushedFilters.nonEmpty) { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - + pairs += ("ReadSchema" -> +StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) pairs.toMap } http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index ec419e4..1a6dba8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -770,4 +770,12 @@ class JDBCSuite extends SparkFunSuite val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } + + test("SPARK-17673: Exchange reuse respects differences in output schema") { +val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL") +val df1 = df.groupBy("a").agg("c" -> "min") +val df2 = df.groupBy("a").agg("d" -> "min") +val res = df1.union(df2) +assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6.
Repository: spark Updated Branches: refs/heads/branch-1.6 e2ce0caed -> b999fa43e [SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6. >From the original commit message: This PR also fixes a regression caused by [SPARK-10987] whereby submitting a shutdown causes a race between the local shutdown procedure and the notification of the scheduler driver disconnection. If the scheduler driver disconnection wins the race, the coarse executor incorrectly exits with status 1 (instead of the proper status 0) Author: Charles Allen (cherry picked from commit 2eaeafe8a2aa31be9b230b8d53d3baccd32535b1) Author: Charles AllenCloses #15270 from vanzin/SPARK-17696. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b999fa43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b999fa43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b999fa43 Branch: refs/heads/branch-1.6 Commit: b999fa43ea0b509341ac2e130cc3787e5f8a75e5 Parents: e2ce0ca Author: Charles Allen Authored: Wed Sep 28 14:39:50 2016 -0700 Committer: Shixiong Zhu Committed: Wed Sep 28 14:39:50 2016 -0700 -- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b999fa43/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c2ebf30..47ce667 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import org.apache.hadoop.conf.Configuration @@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend( env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend( } case StopExecutor => + stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => + stopping.set(true) executor.stop() stop() rpcEnv.shutdown() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { -if (driver.exists(_.address == remoteAddress)) { +if (stopping.get()) { + logInfo(s"Driver from $remoteAddress disconnected during shutdown") +} else if (driver.exists(_.address == remoteAddress)) { logError(s"Driver $remoteAddress disassociated! Shutting down.") System.exit(1) } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan
Repository: spark Updated Branches: refs/heads/master 46d1203bf -> a6cfa3f38 [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan ## What changes were proposed in this pull request? It seems the equality check for reuse of `RowDataSourceScanExec` nodes doesn't respect the output schema. This can cause self-joins or unions over the same underlying data source to return incorrect results if they select different fields. ## How was this patch tested? New unit test passes after the fix. Author: Eric LiangCloses #15273 from ericl/spark-17673. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6cfa3f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6cfa3f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6cfa3f3 Branch: refs/heads/master Commit: a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a Parents: 46d1203 Author: Eric Liang Authored: Wed Sep 28 13:22:45 2016 -0700 Committer: Reynold Xin Committed: Wed Sep 28 13:22:45 2016 -0700 -- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 4 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 2 files changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 63f01c5..693b4c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -340,6 +340,8 @@ object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) +// These metadata values make scan plans uniquely identifiable for equality checking. +// TODO(SPARK-17701) using strings for equality checking is brittle val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)] @@ -350,6 +352,8 @@ object DataSourceStrategy extends Strategy with Logging { } pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]")) } + pairs += ("ReadSchema" -> +StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) pairs.toMap } http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 10f15ca..c94cb3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -791,4 +791,12 @@ class JDBCSuite extends SparkFunSuite val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } + + test("SPARK-17673: Exchange reuse respects differences in output schema") { +val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL") +val df1 = df.groupBy("a").agg("c" -> "min") +val df2 = df.groupBy("a").agg("d" -> "min") +val res = df1.union(df2) +assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
Repository: spark Updated Branches: refs/heads/branch-2.0 4d73d5cd8 -> 4c694e452 [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure | Time|Thread 1 , Job1 | Thread 2 , Job2 | |:-:|:-:|:-:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage || | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure added unit test Author: w00228970Author: wangfei Closes #15213 from scwf/dag-resubmit. (cherry picked from commit 46d1203bf2d01b219c4efc7e0e77a844c0c664da) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c694e45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c694e45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c694e45 Branch: refs/heads/branch-2.0 Commit: 4c694e452278e46231720e778a80c586b9e565f1 Parents: 4d73d5c Author: w00228970 Authored: Wed Sep 28 12:02:59 2016 -0700 Committer: Shixiong Zhu Committed: Wed Sep 28 12:08:56 2016 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 24 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++- 2 files changed, 70 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 399d671..e7e2ff1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1277,18 +1277,20 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - } else if (failedStages.isEmpty) { -// Don't schedule an event to resubmit failed stages if failed isn't empty, because -// in that case the event will already have been scheduled. -// TODO: Cancel running tasks in the stage -logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") -messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) -}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } else { +if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { +override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) +} +failedStages += failedStage +failedStages += mapStage } - failedStages += failedStage - failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f69e8f2..5c35302 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} @@ -31,7 +32,7 @@ import org.apache.spark._ import
spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
Repository: spark Updated Branches: refs/heads/master 219003775 -> 46d1203bf [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure ## What changes were proposed in this pull request? | Time|Thread 1 , Job1 | Thread 2 , Job2 | |:-:|:-:|:-:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage || | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure ## How was this patch tested? added unit test Author: w00228970Author: wangfei Closes #15213 from scwf/dag-resubmit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d1203b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d1203b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d1203b Branch: refs/heads/master Commit: 46d1203bf2d01b219c4efc7e0e77a844c0c664da Parents: 2190037 Author: w00228970 Authored: Wed Sep 28 12:02:59 2016 -0700 Committer: Shixiong Zhu Committed: Wed Sep 28 12:02:59 2016 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 24 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++- 2 files changed, 70 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5ea0b48..f251740 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1263,18 +1263,20 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - } else if (failedStages.isEmpty) { -// Don't schedule an event to resubmit failed stages if failed isn't empty, because -// in that case the event will already have been scheduled. -// TODO: Cancel running tasks in the stage -logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") -messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) -}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } else { +if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { +override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) +} +failedStages += failedStage +failedStages += mapStage } - failedStages += failedStage - failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6787b30..bec95d1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} @@ -31,7 +32,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import
spark git commit: [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation
Repository: spark Updated Branches: refs/heads/master b2a7eedcd -> 219003775 [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation ## What changes were proposed in this pull request? This PR proposes to fix wrongly indented examples in PySpark documentation ``` ->>> json_sdf = spark.readStream.format("json")\ - .schema(sdf_schema)\ - .load(tempfile.mkdtemp()) +>>> json_sdf = spark.readStream.format("json") \\ +... .schema(sdf_schema) \\ +... .load(tempfile.mkdtemp()) ``` ``` -people.filter(people.age > 30).join(department, people.deptId == department.id)\ +people.filter(people.age > 30).join(department, people.deptId == department.id) \\ ``` ``` ->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ -LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] +>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), +... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] ``` ``` ->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ -LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] +>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), +... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] ``` ``` -... for x in iterator: -... print(x) +... for x in iterator: +... print(x) ``` ## How was this patch tested? Manually tested. **Before** ![2016-09-26 8 36 02](https://cloud.githubusercontent.com/assets/6477701/18834471/05c7a478-8431-11e6-94bb-09aa37b12ddb.png) ![2016-09-26 9 22 16](https://cloud.githubusercontent.com/assets/6477701/18834472/06c8735c-8431-11e6-8775-78631eab0411.png) https://cloud.githubusercontent.com/assets/6477701/18861294/29c0d5b4-84bf-11e6-99c5-3c9d913c125d.png;> https://cloud.githubusercontent.com/assets/6477701/18861298/31694cd8-84bf-11e6-9e61-9888cb8c2089.png;> https://cloud.githubusercontent.com/assets/6477701/18861301/359722da-84bf-11e6-97f9-5f5365582d14.png;> **After** ![2016-09-26 9 29 47](https://cloud.githubusercontent.com/assets/6477701/18834467/0367f9da-8431-11e6-86d9-a490d3297339.png) ![2016-09-26 9 30 24](https://cloud.githubusercontent.com/assets/6477701/18834463/f870fae0-8430-11e6-9482-01fc47898492.png) https://cloud.githubusercontent.com/assets/6477701/18861305/3ff88b88-84bf-11e6-902c-9f725e8a8b10.png;> https://cloud.githubusercontent.com/assets/6477701/18863053/592fbc74-84ca-11e6-8dbf-99cf57947de8.png;> https://cloud.githubusercontent.com/assets/6477701/18863060/601607be-84ca-11e6-80aa-a401df41c321.png;> Author: hyukjinkwonCloses #15242 from HyukjinKwon/minor-example-pyspark. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21900377 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21900377 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21900377 Branch: refs/heads/master Commit: 2190037757a81d3172f75227f7891d968e1f0d90 Parents: b2a7eed Author: hyukjinkwon Authored: Wed Sep 28 06:19:04 2016 -0400 Committer: Sean Owen Committed: Wed Sep 28 06:19:04 2016 -0400 -- python/pyspark/mllib/util.py| 8 python/pyspark/rdd.py | 4 ++-- python/pyspark/sql/dataframe.py | 2 +- python/pyspark/sql/streaming.py | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21900377/python/pyspark/mllib/util.py -- diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 48867a0..ed6fd4b 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -140,8 +140,8 @@ class MLUtils(object): >>> from pyspark.mllib.regression import LabeledPoint >>> from glob import glob >>> from pyspark.mllib.util import MLUtils ->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ -LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] +>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), +... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name) @@ -166,8 +166,8 @@ class MLUtils(object): >>> from tempfile import NamedTemporaryFile >>> from pyspark.mllib.util import
spark git commit: [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation
Repository: spark Updated Branches: refs/heads/branch-2.0 1b02f8820 -> 4d73d5cd8 [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation ## What changes were proposed in this pull request? This PR proposes to fix wrongly indented examples in PySpark documentation ``` ->>> json_sdf = spark.readStream.format("json")\ - .schema(sdf_schema)\ - .load(tempfile.mkdtemp()) +>>> json_sdf = spark.readStream.format("json") \\ +... .schema(sdf_schema) \\ +... .load(tempfile.mkdtemp()) ``` ``` -people.filter(people.age > 30).join(department, people.deptId == department.id)\ +people.filter(people.age > 30).join(department, people.deptId == department.id) \\ ``` ``` ->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ -LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] +>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), +... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] ``` ``` ->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ -LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] +>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), +... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] ``` ``` -... for x in iterator: -... print(x) +... for x in iterator: +... print(x) ``` ## How was this patch tested? Manually tested. **Before** ![2016-09-26 8 36 02](https://cloud.githubusercontent.com/assets/6477701/18834471/05c7a478-8431-11e6-94bb-09aa37b12ddb.png) ![2016-09-26 9 22 16](https://cloud.githubusercontent.com/assets/6477701/18834472/06c8735c-8431-11e6-8775-78631eab0411.png) https://cloud.githubusercontent.com/assets/6477701/18861294/29c0d5b4-84bf-11e6-99c5-3c9d913c125d.png;> https://cloud.githubusercontent.com/assets/6477701/18861298/31694cd8-84bf-11e6-9e61-9888cb8c2089.png;> https://cloud.githubusercontent.com/assets/6477701/18861301/359722da-84bf-11e6-97f9-5f5365582d14.png;> **After** ![2016-09-26 9 29 47](https://cloud.githubusercontent.com/assets/6477701/18834467/0367f9da-8431-11e6-86d9-a490d3297339.png) ![2016-09-26 9 30 24](https://cloud.githubusercontent.com/assets/6477701/18834463/f870fae0-8430-11e6-9482-01fc47898492.png) https://cloud.githubusercontent.com/assets/6477701/18861305/3ff88b88-84bf-11e6-902c-9f725e8a8b10.png;> https://cloud.githubusercontent.com/assets/6477701/18863053/592fbc74-84ca-11e6-8dbf-99cf57947de8.png;> https://cloud.githubusercontent.com/assets/6477701/18863060/601607be-84ca-11e6-80aa-a401df41c321.png;> Author: hyukjinkwonCloses #15242 from HyukjinKwon/minor-example-pyspark. (cherry picked from commit 2190037757a81d3172f75227f7891d968e1f0d90) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d73d5cd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d73d5cd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d73d5cd Branch: refs/heads/branch-2.0 Commit: 4d73d5cd82ebc980f996c78f9afb8a97418ab7ab Parents: 1b02f88 Author: hyukjinkwon Authored: Wed Sep 28 06:19:04 2016 -0400 Committer: Sean Owen Committed: Wed Sep 28 06:19:18 2016 -0400 -- python/pyspark/mllib/util.py| 8 python/pyspark/rdd.py | 4 ++-- python/pyspark/sql/dataframe.py | 2 +- python/pyspark/sql/streaming.py | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d73d5cd/python/pyspark/mllib/util.py -- diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 48867a0..ed6fd4b 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -140,8 +140,8 @@ class MLUtils(object): >>> from pyspark.mllib.regression import LabeledPoint >>> from glob import glob >>> from pyspark.mllib.util import MLUtils ->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ -LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] +>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), +... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name) @@ -166,8 +166,8 @@
spark git commit: [SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml/mllib feature selection docs for ChiSqSelector
Repository: spark Updated Branches: refs/heads/master 4a8339568 -> b2a7eedcd [SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml/mllib feature selection docs for ChiSqSelector ## What changes were proposed in this pull request? A follow up for #14597 to update feature selection docs about ChiSqSelector. ## How was this patch tested? Generated html docs. It can be previewed at: * ml: http://sparkdocs.lins05.pw/spark-17017/ml-features.html#chisqselector * mllib: http://sparkdocs.lins05.pw/spark-17017/mllib-feature-extraction.html#chisqselector Author: Shuai LinCloses #15236 from lins05/spark-17017-update-docs-for-chisq-selector-fpr. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2a7eedc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2a7eedc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2a7eedc Branch: refs/heads/master Commit: b2a7eedcddf0e682ff46afd1b764d0b81ccdf395 Parents: 4a83395 Author: Shuai Lin Authored: Wed Sep 28 06:12:48 2016 -0400 Committer: Sean Owen Committed: Wed Sep 28 06:12:48 2016 -0400 -- docs/ml-features.md | 14 ++ docs/mllib-feature-extraction.md | 14 ++ 2 files changed, 20 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2a7eedc/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index a39b31c..a7f710f 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1331,10 +1331,16 @@ for more details on the API. ## ChiSqSelector `ChiSqSelector` stands for Chi-Squared feature selection. It operates on labeled data with -categorical features. ChiSqSelector orders features based on a -[Chi-Squared test of independence](https://en.wikipedia.org/wiki/Chi-squared_test) -from the class, and then filters (selects) the top features which the class label depends on the -most. This is akin to yielding the features with the most predictive power. +categorical features. ChiSqSelector uses the +[Chi-Squared test of independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which +features to choose. It supports three selection methods: `KBest`, `Percentile` and `FPR`: + +* `KBest` chooses the `k` top features according to a chi-squared test. This is akin to yielding the features with the most predictive power. +* `Percentile` is similar to `KBest` but chooses a fraction of all features instead of a fixed number. +* `FPR` chooses all features whose false positive rate meets some threshold. + +By default, the selection method is `KBest`, the default number of top features is 50. User can use +`setNumTopFeatures`, `setPercentile` and `setAlpha` to set different selection methods. **Examples** http://git-wip-us.apache.org/repos/asf/spark/blob/b2a7eedc/docs/mllib-feature-extraction.md -- diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 353d391..87e1e02 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -225,10 +225,16 @@ features for use in model construction. It reduces the size of the feature space both speed and statistical learning behavior. [`ChiSqSelector`](api/scala/index.html#org.apache.spark.mllib.feature.ChiSqSelector) implements -Chi-Squared feature selection. It operates on labeled data with categorical features. -`ChiSqSelector` orders features based on a Chi-Squared test of independence from the class, -and then filters (selects) the top features which the class label depends on the most. -This is akin to yielding the features with the most predictive power. +Chi-Squared feature selection. It operates on labeled data with categorical features. ChiSqSelector uses the +[Chi-Squared test of independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which +features to choose. It supports three selection methods: `KBest`, `Percentile` and `FPR`: + +* `KBest` chooses the `k` top features according to a chi-squared test. This is akin to yielding the features with the most predictive power. +* `Percentile` is similar to `KBest` but chooses a fraction of all features instead of a fixed number. +* `FPR` chooses all features whose false positive rate meets some threshold. + +By default, the selection method is `KBest`, the default number of top features is 50. User can use +`setNumTopFeatures`, `setPercentile` and `setAlpha` to set different selection methods. The number of features to select can be tuned using a held-out validation set. - To
spark git commit: [SPARK-17666] Ensure that RecordReaders are closed by data source file scans (backport)
Repository: spark Updated Branches: refs/heads/branch-2.0 2cd327ef5 -> 1b02f8820 [SPARK-17666] Ensure that RecordReaders are closed by data source file scans (backport) This is a branch-2.0 backport of #15245. ## What changes were proposed in this pull request? This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed. This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed. ## How was this patch tested? Tested manually for now. Author: Josh RosenCloses #15271 from JoshRosen/SPARK-17666-backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b02f882 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b02f882 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b02f882 Branch: refs/heads/branch-2.0 Commit: 1b02f8820ddaf3f2a0e7acc9a7f27afc20683cca Parents: 2cd327e Author: Josh Rosen Authored: Wed Sep 28 00:59:00 2016 -0700 Committer: Reynold Xin Committed: Wed Sep 28 00:59:00 2016 -0700 -- .../spark/ml/source/libsvm/LibSVMRelation.scala | 7 +-- .../datasources/HadoopFileLinesReader.scala | 6 +- .../datasources/RecordReaderIterator.scala | 21 ++-- .../datasources/csv/CSVFileFormat.scala | 5 - .../datasources/json/JsonFileFormat.scala | 5 - .../datasources/parquet/ParquetFileFormat.scala | 3 ++- .../datasources/text/TextFileFormat.scala | 2 ++ .../spark/sql/hive/orc/OrcFileFormat.scala | 6 +- 8 files changed, 46 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 034223e..ac95b92 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import org.apache.spark.TaskContext import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.mllib.util.MLUtils @@ -160,8 +161,10 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { - val points = -new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) + val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) + + val points = linesReader .map(_.toString.trim) .filterNot(line => line.isEmpty || line.startsWith("#")) .map { line => http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 18f9b55..83cf26c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.io.Closeable import java.net.URI import org.apache.hadoop.conf.Configuration