spark git commit: [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates

2016-09-28 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 7dfad4b13 -> 37eb9184f


[SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath 
aggregates

## What changes were proposed in this pull request?

This patch fixes a minor correctness issue impacting the pushdown of filters 
beneath aggregates. Specifically, if a filter condition references no grouping 
or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed 
beneath an aggregate.

Intuitively, the only case where you can push a filter beneath an aggregate is 
when that filter is deterministic and is defined over the grouping columns / 
expressions, since in that case the filter is acting to exclude entire groups 
from the query (like a `HAVING` clause). The existing code would only push 
deterministic filters beneath aggregates when all of the filter's references 
were grouping columns, but this logic missed the case where a filter has no 
references. For example, `WHERE false` is deterministic but is independent of 
the actual data.

This patch fixes this minor bug by adding a new check to ensure that we don't 
push filters beneath aggregates when those filters don't reference any columns.

## How was this patch tested?

New regression test in FilterPushdownSuite.

Author: Josh Rosen 

Closes #15289 from JoshRosen/SPARK-17712.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37eb9184
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37eb9184
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37eb9184

Branch: refs/heads/master
Commit: 37eb9184f1e9f1c07142c66936671f4711ef407d
Parents: 7dfad4b
Author: Josh Rosen 
Authored: Wed Sep 28 19:03:05 2016 -0700
Committer: Herman van Hovell 
Committed: Wed Sep 28 19:03:05 2016 -0700

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala   | 17 +
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/37eb9184/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0df16b7..4952ba3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -710,7 +710,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
   val (pushDown, rest) = candidates.partition { cond =>
 val replaced = replaceAlias(cond, aliasMap)
-replaced.references.subsetOf(aggregate.child.outputSet)
+cond.references.nonEmpty && 
replaced.references.subsetOf(aggregate.child.outputSet)
   }
 
   val stayUp = rest ++ containingNonDeterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/37eb9184/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 55836f9..019f132 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -687,6 +687,23 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-17712: aggregate: don't push down filters that are 
data-independent") {
+val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty)
+  .select('a, 'b)
+  .groupBy('a)(count('a))
+  .where(false)
+
+val optimized = Optimize.execute(originalQuery.analyze)
+
+val correctAnswer = testRelation
+  .select('a, 'b)
+  .groupBy('a)(count('a))
+  .where(false)
+  .analyze
+
+comparePlans(optimized, correctAnswer)
+  }
+
   test("broadcast hint") {
 val originalQuery = BroadcastHint(testRelation)
   .where('a === 2L && 'b + Rand(10).as("rnd") === 3)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17710][HOTFIX] Fix ClassCircularityError in ReplSuite tests in Maven build: use 'Class.forName' instead of 'Utils.classForName'

2016-09-28 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 7d0923202 -> 7dfad4b13


[SPARK-17710][HOTFIX] Fix ClassCircularityError in ReplSuite tests in Maven 
build: use 'Class.forName' instead of 'Utils.classForName'

## What changes were proposed in this pull request?
Fix ClassCircularityError in ReplSuite tests when Spark is built by Maven build.

## How was this patch tested?
(1)
```
build/mvn -DskipTests -Phadoop-2.3 -Pyarn -Phive -Phive-thriftserver 
-Pkinesis-asl -Pmesos clean package
```
Then test:
```
build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.repl.ReplSuite test
```
ReplSuite tests passed

(2)
Manual Tests against some Spark applications in Yarn client mode and Yarn 
cluster mode. Need to check if spark caller contexts are written into HDFS 
hdfs-audit.log and Yarn RM audit log successfully.

Author: Weiqing Yang 

Closes #15286 from Sherry302/SPARK-16757.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7dfad4b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7dfad4b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7dfad4b1

Branch: refs/heads/master
Commit: 7dfad4b132bc46263ef788ced4a935862f5c8756
Parents: 7d09232
Author: Weiqing Yang 
Authored: Wed Sep 28 20:20:03 2016 -0500
Committer: Tom Graves 
Committed: Wed Sep 28 20:20:03 2016 -0500

--
 core/src/main/scala/org/apache/spark/util/Utils.scala | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7dfad4b1/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index caa768c..f3493bd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2489,8 +2489,10 @@ private[spark] class CallerContext(
   def setCurrentContext(): Boolean = {
 var succeed = false
 try {
-  val callerContext = 
Utils.classForName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
+  // scalastyle:off classforname
+  val callerContext = Class.forName("org.apache.hadoop.ipc.CallerContext")
+  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
+  // scalastyle:on classforname
   val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
   val hdfsContext = Builder.getMethod("build").invoke(builderInst)
   callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: Preparing Spark release v2.0.1-rc4

2016-09-28 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0a69477a1 -> 7d612a7d5


Preparing Spark release v2.0.1-rc4


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/933d2c1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/933d2c1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/933d2c1e

Branch: refs/heads/branch-2.0
Commit: 933d2c1ea4e5f5c4ec8d375b5ccaa4577ba4be38
Parents: 0a69477
Author: Patrick Wendell 
Authored: Wed Sep 28 16:27:45 2016 -0700
Committer: Patrick Wendell 
Committed: Wed Sep 28 16:27:45 2016 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 36 files changed, 37 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index dfb7e22..3e49eac 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.0.2
+Version: 2.0.1
 Date: 2016-08-27
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ca6daa2..6db3a59 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index c727f54..269b845 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index e335a89..20cf29e 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/933d2c1e/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 8e64f56..25cc328 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../../pom.xml
   
 


[2/2] spark git commit: Preparing development version 2.0.2-SNAPSHOT

2016-09-28 Thread pwendell
Preparing development version 2.0.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d612a7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d612a7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d612a7d

Branch: refs/heads/branch-2.0
Commit: 7d612a7d5277183d3bee3882a687c76dc8ea0e9a
Parents: 933d2c1
Author: Patrick Wendell 
Authored: Wed Sep 28 16:27:54 2016 -0700
Committer: Patrick Wendell 
Committed: Wed Sep 28 16:27:54 2016 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 36 files changed, 37 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 3e49eac..dfb7e22 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.0.1
+Version: 2.0.2
 Date: 2016-08-27
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 6db3a59..ca6daa2 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 269b845..c727f54 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 20cf29e..e335a89 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 25cc328..8e64f56 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d612a7d/common/sketch/pom.xml

[spark] Git Push Summary

2016-09-28 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v2.0.1-rc4 [created] 933d2c1ea

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d358298f1 -> 0a69477a1


[SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.

## What changes were proposed in this pull request?
We added native versions of `collect_set` and `collect_list` in Spark 2.0. 
These currently also (try to) collect null values, this is different from the 
original Hive implementation. This PR fixes this by adding a null check to the 
`Collect.update` method.

## How was this patch tested?
Added a regression test to `DataFrameAggregateSuite`.

Author: Herman van Hovell 

Closes #15208 from hvanhovell/SPARK-17641.

(cherry picked from commit 7d09232028967978d9db314ec041a762599f636b)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a69477a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a69477a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a69477a

Branch: refs/heads/branch-2.0
Commit: 0a69477a10adb3969a20ae870436299ef5152788
Parents: d358298
Author: Herman van Hovell 
Authored: Wed Sep 28 16:25:10 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 16:25:31 2016 -0700

--
 .../sql/catalyst/expressions/aggregate/collect.scala|  7 ++-
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala  | 12 
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a69477a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 896ff61..78a388d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -65,7 +65,12 @@ abstract class Collect extends ImperativeAggregate {
   }
 
   override def update(b: MutableRow, input: InternalRow): Unit = {
-buffer += child.eval(input)
+// Do not allow null values. We follow the semantics of Hive's 
collect_list/collect_set here.
+// See: 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator
+val value = child.eval(input)
+if (value != null) {
+  buffer += value
+}
   }
 
   override def merge(buffer: MutableRow, input: InternalRow): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a69477a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index cb505ac..3454caf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -477,6 +477,18 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
 assert(error.message.contains("collect_set() cannot have map type data"))
   }
 
+  test("SPARK-17641: collect functions should not collect null values") {
+val df = Seq(("1", 2), (null, 2), ("1", 4)).toDF("a", "b")
+checkAnswer(
+  df.select(collect_list($"a"), collect_list($"b")),
+  Seq(Row(Seq("1", "1"), Seq(2, 2, 4)))
+)
+checkAnswer(
+  df.select(collect_set($"a"), collect_set($"b")),
+  Seq(Row(Seq("1"), Seq(2, 4)))
+)
+  }
+
   test("SPARK-14664: Decimal sum/avg over window should work.") {
 checkAnswer(
   spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 557d6e322 -> 7d0923202


[SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.

## What changes were proposed in this pull request?
We added native versions of `collect_set` and `collect_list` in Spark 2.0. 
These currently also (try to) collect null values, this is different from the 
original Hive implementation. This PR fixes this by adding a null check to the 
`Collect.update` method.

## How was this patch tested?
Added a regression test to `DataFrameAggregateSuite`.

Author: Herman van Hovell 

Closes #15208 from hvanhovell/SPARK-17641.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d092320
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d092320
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d092320

Branch: refs/heads/master
Commit: 7d09232028967978d9db314ec041a762599f636b
Parents: 557d6e3
Author: Herman van Hovell 
Authored: Wed Sep 28 16:25:10 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 16:25:10 2016 -0700

--
 .../sql/catalyst/expressions/aggregate/collect.scala|  7 ++-
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala  | 12 
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7d092320/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 896ff61..78a388d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -65,7 +65,12 @@ abstract class Collect extends ImperativeAggregate {
   }
 
   override def update(b: MutableRow, input: InternalRow): Unit = {
-buffer += child.eval(input)
+// Do not allow null values. We follow the semantics of Hive's 
collect_list/collect_set here.
+// See: 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator
+val value = child.eval(input)
+if (value != null) {
+  buffer += value
+}
   }
 
   override def merge(buffer: MutableRow, input: InternalRow): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d092320/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 0e172be..7aa4f00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -477,6 +477,18 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
 assert(error.message.contains("collect_set() cannot have map type data"))
   }
 
+  test("SPARK-17641: collect functions should not collect null values") {
+val df = Seq(("1", 2), (null, 2), ("1", 4)).toDF("a", "b")
+checkAnswer(
+  df.select(collect_list($"a"), collect_list($"b")),
+  Seq(Row(Seq("1", "1"), Seq(2, 2, 4)))
+)
+checkAnswer(
+  df.select(collect_set($"a"), collect_set($"b")),
+  Seq(Row(Seq("1"), Seq(2, 4)))
+)
+  }
+
   test("SPARK-14664: Decimal sum/avg over window should work.") {
 checkAnswer(
   spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17713][SQL] Move row-datasource related tests out of JDBCSuite

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a6cfa3f38 -> 557d6e322


[SPARK-17713][SQL] Move row-datasource related tests out of JDBCSuite

## What changes were proposed in this pull request?

As a followup for https://github.com/apache/spark/pull/15273 we should move 
non-JDBC specific tests out of that suite.

## How was this patch tested?

Ran the test.

Author: Eric Liang 

Closes #15287 from ericl/spark-17713.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/557d6e32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/557d6e32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/557d6e32

Branch: refs/heads/master
Commit: 557d6e32272dee4eaa0f426cc3e2f82ea361c3da
Parents: a6cfa3f
Author: Eric Liang 
Authored: Wed Sep 28 16:20:49 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 16:20:49 2016 -0700

--
 .../RowDataSourceStrategySuite.scala| 72 
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  8 ---
 2 files changed, 72 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/557d6e32/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala
new file mode 100644
index 000..d9afa46
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.sql.DriverManager
+import java.util.Properties
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+class RowDataSourceStrategySuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext {
+  import testImplicits._
+
+  val url = "jdbc:h2:mem:testdb0"
+  val urlWithUserAndPass = 
"jdbc:h2:mem:testdb0;user=testUser;password=testPass"
+  var conn: java.sql.Connection = null
+
+  before {
+Utils.classForName("org.h2.Driver")
+// Extra properties that will be specified for our database. We need these 
to test
+// usage of parameters from OPTIONS clause in queries.
+val properties = new Properties()
+properties.setProperty("user", "testUser")
+properties.setProperty("password", "testPass")
+properties.setProperty("rowId", "false")
+
+conn = DriverManager.getConnection(url, properties)
+conn.prepareStatement("create schema test").executeUpdate()
+conn.prepareStatement("create table test.inttypes (a INT, b INT, c 
INT)").executeUpdate()
+conn.prepareStatement("insert into test.inttypes values (1, 2, 
3)").executeUpdate()
+conn.commit()
+sql(
+  s"""
+|CREATE TEMPORARY TABLE inttypes
+|USING org.apache.spark.sql.jdbc
+|OPTIONS (url '$url', dbtable 'TEST.INTTYPES', user 'testUser', 
password 'testPass')
+  """.stripMargin.replaceAll("\n", " "))
+  }
+
+  after {
+conn.close()
+  }
+
+  test("SPARK-17673: Exchange reuse respects differences in output schema") {
+val df = sql("SELECT * FROM inttypes")
+val df1 = df.groupBy("a").agg("b" -> "min")
+val df2 = df.groupBy("a").agg("c" -> "min")
+val res = df1.union(df2)
+assert(res.distinct().count() == 2)  // would be 1 if the exchange was 
incorrectly reused
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/557d6e32/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport)

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4c694e452 -> d358298f1


[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport)

This backports https://github.com/apache/spark/pull/15273 to branch-2.0

Also verified the test passes after the patch was applied. rxin

Author: Eric Liang 

Closes #15282 from ericl/spark-17673-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d358298f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d358298f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d358298f

Branch: refs/heads/branch-2.0
Commit: d358298f1082edd31489a1b08f428c8e60278d69
Parents: 4c694e4
Author: Eric Liang 
Authored: Wed Sep 28 16:19:06 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 16:19:06 2016 -0700

--
 .../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 -
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 
 2 files changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6b4b3b8..2779694 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -347,13 +347,16 @@ object DataSourceStrategy extends Strategy with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+// These metadata values make scan plans uniquely identifiable for 
equality checking.
+// TODO(SPARK-17701) using strings for equality checking is brittle
 val metadata: Map[String, String] = {
   val pairs = ArrayBuffer.empty[(String, String)]
 
   if (pushedFilters.nonEmpty) {
 pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
   }
-
+  pairs += ("ReadSchema" ->
+StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
   pairs.toMap
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index ec419e4..1a6dba8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -770,4 +770,12 @@ class JDBCSuite extends SparkFunSuite
 val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
 assert(schema.contains("`order` TEXT"))
   }
+
+  test("SPARK-17673: Exchange reuse respects differences in output schema") {
+val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL")
+val df1 = df.groupBy("a").agg("c" -> "min")
+val df2 = df.groupBy("a").agg("d" -> "min")
+val res = df1.union(df2)
+assert(res.distinct().count() == 2)  // would be 1 if the exchange was 
incorrectly reused
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6.

2016-09-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e2ce0caed -> b999fa43e


[SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6.

>From the original commit message:

This PR also fixes a regression caused by [SPARK-10987] whereby submitting a 
shutdown causes a race between the local shutdown procedure and the 
notification of the scheduler driver disconnection. If the scheduler driver 
disconnection wins the race, the coarse executor incorrectly exits with status 
1 (instead of the proper status 0)

Author: Charles Allen 

(cherry picked from commit 2eaeafe8a2aa31be9b230b8d53d3baccd32535b1)

Author: Charles Allen 

Closes #15270 from vanzin/SPARK-17696.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b999fa43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b999fa43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b999fa43

Branch: refs/heads/branch-1.6
Commit: b999fa43ea0b509341ac2e130cc3787e5f8a75e5
Parents: e2ce0ca
Author: Charles Allen 
Authored: Wed Sep 28 14:39:50 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Sep 28 14:39:50 2016 -0700

--
 .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b999fa43/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c2ebf30..47ce667 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
 
 import java.net.URL
 import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.hadoop.conf.Configuration
 
@@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 env: SparkEnv)
   extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
 
+  private[this] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
 
@@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
 case StopExecutor =>
+  stopping.set(true)
   logInfo("Driver commanded a shutdown")
   // Cannot shutdown here because an ack may need to be sent back to the 
caller. So send
   // a message to self to actually do the shutdown.
   self.send(Shutdown)
 
 case Shutdown =>
+  stopping.set(true)
   executor.stop()
   stop()
   rpcEnv.shutdown()
   }
 
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-if (driver.exists(_.address == remoteAddress)) {
+if (stopping.get()) {
+  logInfo(s"Driver from $remoteAddress disconnected during shutdown")
+} else if (driver.exists(_.address == remoteAddress)) {
   logError(s"Driver $remoteAddress disassociated! Shutting down.")
   System.exit(1)
 } else {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 46d1203bf -> a6cfa3f38


[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan

## What changes were proposed in this pull request?

It seems the equality check for reuse of `RowDataSourceScanExec` nodes doesn't 
respect the output schema. This can cause self-joins or unions over the same 
underlying data source to return incorrect results if they select different 
fields.

## How was this patch tested?

New unit test passes after the fix.

Author: Eric Liang 

Closes #15273 from ericl/spark-17673.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6cfa3f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6cfa3f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6cfa3f3

Branch: refs/heads/master
Commit: a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a
Parents: 46d1203
Author: Eric Liang 
Authored: Wed Sep 28 13:22:45 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 13:22:45 2016 -0700

--
 .../spark/sql/execution/datasources/DataSourceStrategy.scala | 4 
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 
 2 files changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 63f01c5..693b4c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -340,6 +340,8 @@ object DataSourceStrategy extends Strategy with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+// These metadata values make scan plans uniquely identifiable for 
equality checking.
+// TODO(SPARK-17701) using strings for equality checking is brittle
 val metadata: Map[String, String] = {
   val pairs = ArrayBuffer.empty[(String, String)]
 
@@ -350,6 +352,8 @@ object DataSourceStrategy extends Strategy with Logging {
 }
 pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
   }
+  pairs += ("ReadSchema" ->
+StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
   pairs.toMap
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 10f15ca..c94cb3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -791,4 +791,12 @@ class JDBCSuite extends SparkFunSuite
 val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
 assert(schema.contains("`order` TEXT"))
   }
+
+  test("SPARK-17673: Exchange reuse respects differences in output schema") {
+val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL")
+val df1 = df.groupBy("a").agg("c" -> "min")
+val df2 = df.groupBy("a").agg("d" -> "min")
+val res = df1.union(df2)
+assert(res.distinct().count() == 2)  // would be 1 if the exchange was 
incorrectly reused
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

2016-09-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4d73d5cd8 -> 4c694e452


[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

| Time|Thread 1 ,  Job1  | Thread 2 ,  Job2  |
|:-:|:-:|:-:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage ||
| 3 |  |  task failed due to  FetchFailed |
| 4 |  |  can not post ResubmitFailedStages because failedStages is not 
empty |

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

added unit test

Author: w00228970 
Author: wangfei 

Closes #15213 from scwf/dag-resubmit.

(cherry picked from commit 46d1203bf2d01b219c4efc7e0e77a844c0c664da)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c694e45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c694e45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c694e45

Branch: refs/heads/branch-2.0
Commit: 4c694e452278e46231720e778a80c586b9e565f1
Parents: 4d73d5c
Author: w00228970 
Authored: Wed Sep 28 12:02:59 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Sep 28 12:08:56 2016 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 
 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++-
 2 files changed, 70 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 399d671..e7e2ff1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1277,18 +1277,20 @@ class DAGScheduler(
   s"has failed the maximum allowable number of " +
   s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
   s"Most recent failure reason: ${failureMessage}", None)
-  } else if (failedStages.isEmpty) {
-// Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
-// in that case the event will already have been scheduled.
-// TODO: Cancel running tasks in the stage
-logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-  s"$failedStage (${failedStage.name}) due to fetch failure")
-messageScheduler.schedule(new Runnable {
-  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  } else {
+if (failedStages.isEmpty) {
+  // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
+  // in that case the event will already have been scheduled.
+  // TODO: Cancel running tasks in the stage
+  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure")
+  messageScheduler.schedule(new Runnable {
+override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+}
+failedStages += failedStage
+failedStages += mapStage
   }
-  failedStages += failedStage
-  failedStages += mapStage
   // Mark the map whose fetch failed as broken in the map stage
   if (mapId != -1) {
 mapStage.removeOutputLoc(mapId, bmAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f69e8f2..5c35302 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
 import 

spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

2016-09-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 219003775 -> 46d1203bf


[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

## What changes were proposed in this pull request?
| Time|Thread 1 ,  Job1  | Thread 2 ,  Job2  |
|:-:|:-:|:-:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage ||
| 3 |  |  task failed due to  FetchFailed |
| 4 |  |  can not post ResubmitFailedStages because failedStages is not 
empty |

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

## How was this patch tested?

added unit test

Author: w00228970 
Author: wangfei 

Closes #15213 from scwf/dag-resubmit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d1203b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d1203b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d1203b

Branch: refs/heads/master
Commit: 46d1203bf2d01b219c4efc7e0e77a844c0c664da
Parents: 2190037
Author: w00228970 
Authored: Wed Sep 28 12:02:59 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Sep 28 12:02:59 2016 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 
 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++-
 2 files changed, 70 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5ea0b48..f251740 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1263,18 +1263,20 @@ class DAGScheduler(
   s"has failed the maximum allowable number of " +
   s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
   s"Most recent failure reason: ${failureMessage}", None)
-  } else if (failedStages.isEmpty) {
-// Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
-// in that case the event will already have been scheduled.
-// TODO: Cancel running tasks in the stage
-logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-  s"$failedStage (${failedStage.name}) due to fetch failure")
-messageScheduler.schedule(new Runnable {
-  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  } else {
+if (failedStages.isEmpty) {
+  // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
+  // in that case the event will already have been scheduled.
+  // TODO: Cancel running tasks in the stage
+  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure")
+  messageScheduler.schedule(new Runnable {
+override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+}
+failedStages += failedStage
+failedStages += mapStage
   }
-  failedStages += failedStage
-  failedStages += mapStage
   // Mark the map whose fetch failed as broken in the map stage
   if (mapId != -1) {
 mapStage.removeOutputLoc(mapId, bmAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6787b30..bec95d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import 

spark git commit: [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation

2016-09-28 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master b2a7eedcd -> 219003775


[MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation

## What changes were proposed in this pull request?

This PR proposes to fix wrongly indented examples in PySpark documentation

```
->>> json_sdf = spark.readStream.format("json")\
-   .schema(sdf_schema)\
-   .load(tempfile.mkdtemp())
+>>> json_sdf = spark.readStream.format("json") \\
+... .schema(sdf_schema) \\
+... .load(tempfile.mkdtemp())
```

```
-people.filter(people.age > 30).join(department, people.deptId == 
department.id)\
+people.filter(people.age > 30).join(department, people.deptId == 
department.id) \\
```

```
->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])), \
-LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])),
+... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
```

```
->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 
4.56e-7)])), \
-LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 
4.56e-7)])),
+... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
```

```
-...  for x in iterator:
-...   print(x)
+... for x in iterator:
+...  print(x)
```

## How was this patch tested?

Manually tested.

**Before**

![2016-09-26 8 36 
02](https://cloud.githubusercontent.com/assets/6477701/18834471/05c7a478-8431-11e6-94bb-09aa37b12ddb.png)

![2016-09-26 9 22 
16](https://cloud.githubusercontent.com/assets/6477701/18834472/06c8735c-8431-11e6-8775-78631eab0411.png)

https://cloud.githubusercontent.com/assets/6477701/18861294/29c0d5b4-84bf-11e6-99c5-3c9d913c125d.png;>

https://cloud.githubusercontent.com/assets/6477701/18861298/31694cd8-84bf-11e6-9e61-9888cb8c2089.png;>

https://cloud.githubusercontent.com/assets/6477701/18861301/359722da-84bf-11e6-97f9-5f5365582d14.png;>

**After**

![2016-09-26 9 29 
47](https://cloud.githubusercontent.com/assets/6477701/18834467/0367f9da-8431-11e6-86d9-a490d3297339.png)

![2016-09-26 9 30 
24](https://cloud.githubusercontent.com/assets/6477701/18834463/f870fae0-8430-11e6-9482-01fc47898492.png)

https://cloud.githubusercontent.com/assets/6477701/18861305/3ff88b88-84bf-11e6-902c-9f725e8a8b10.png;>

https://cloud.githubusercontent.com/assets/6477701/18863053/592fbc74-84ca-11e6-8dbf-99cf57947de8.png;>

https://cloud.githubusercontent.com/assets/6477701/18863060/601607be-84ca-11e6-80aa-a401df41c321.png;>

Author: hyukjinkwon 

Closes #15242 from HyukjinKwon/minor-example-pyspark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21900377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21900377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21900377

Branch: refs/heads/master
Commit: 2190037757a81d3172f75227f7891d968e1f0d90
Parents: b2a7eed
Author: hyukjinkwon 
Authored: Wed Sep 28 06:19:04 2016 -0400
Committer: Sean Owen 
Committed: Wed Sep 28 06:19:04 2016 -0400

--
 python/pyspark/mllib/util.py| 8 
 python/pyspark/rdd.py   | 4 ++--
 python/pyspark/sql/dataframe.py | 2 +-
 python/pyspark/sql/streaming.py | 6 +++---
 4 files changed, 10 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21900377/python/pyspark/mllib/util.py
--
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 48867a0..ed6fd4b 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -140,8 +140,8 @@ class MLUtils(object):
 >>> from pyspark.mllib.regression import LabeledPoint
 >>> from glob import glob
 >>> from pyspark.mllib.util import MLUtils
->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])), \
-LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])),
+... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
 >>> tempFile = NamedTemporaryFile(delete=True)
 >>> tempFile.close()
 >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
@@ -166,8 +166,8 @@ class MLUtils(object):
 >>> from tempfile import NamedTemporaryFile
 >>> from pyspark.mllib.util import 

spark git commit: [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation

2016-09-28 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1b02f8820 -> 4d73d5cd8


[MINOR][PYSPARK][DOCS] Fix examples in PySpark documentation

## What changes were proposed in this pull request?

This PR proposes to fix wrongly indented examples in PySpark documentation

```
->>> json_sdf = spark.readStream.format("json")\
-   .schema(sdf_schema)\
-   .load(tempfile.mkdtemp())
+>>> json_sdf = spark.readStream.format("json") \\
+... .schema(sdf_schema) \\
+... .load(tempfile.mkdtemp())
```

```
-people.filter(people.age > 30).join(department, people.deptId == 
department.id)\
+people.filter(people.age > 30).join(department, people.deptId == 
department.id) \\
```

```
->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])), \
-LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])),
+... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
```

```
->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 
4.56e-7)])), \
-LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 
4.56e-7)])),
+... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
```

```
-...  for x in iterator:
-...   print(x)
+... for x in iterator:
+...  print(x)
```

## How was this patch tested?

Manually tested.

**Before**

![2016-09-26 8 36 
02](https://cloud.githubusercontent.com/assets/6477701/18834471/05c7a478-8431-11e6-94bb-09aa37b12ddb.png)

![2016-09-26 9 22 
16](https://cloud.githubusercontent.com/assets/6477701/18834472/06c8735c-8431-11e6-8775-78631eab0411.png)

https://cloud.githubusercontent.com/assets/6477701/18861294/29c0d5b4-84bf-11e6-99c5-3c9d913c125d.png;>

https://cloud.githubusercontent.com/assets/6477701/18861298/31694cd8-84bf-11e6-9e61-9888cb8c2089.png;>

https://cloud.githubusercontent.com/assets/6477701/18861301/359722da-84bf-11e6-97f9-5f5365582d14.png;>

**After**

![2016-09-26 9 29 
47](https://cloud.githubusercontent.com/assets/6477701/18834467/0367f9da-8431-11e6-86d9-a490d3297339.png)

![2016-09-26 9 30 
24](https://cloud.githubusercontent.com/assets/6477701/18834463/f870fae0-8430-11e6-9482-01fc47898492.png)

https://cloud.githubusercontent.com/assets/6477701/18861305/3ff88b88-84bf-11e6-902c-9f725e8a8b10.png;>

https://cloud.githubusercontent.com/assets/6477701/18863053/592fbc74-84ca-11e6-8dbf-99cf57947de8.png;>

https://cloud.githubusercontent.com/assets/6477701/18863060/601607be-84ca-11e6-80aa-a401df41c321.png;>

Author: hyukjinkwon 

Closes #15242 from HyukjinKwon/minor-example-pyspark.

(cherry picked from commit 2190037757a81d3172f75227f7891d968e1f0d90)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d73d5cd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d73d5cd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d73d5cd

Branch: refs/heads/branch-2.0
Commit: 4d73d5cd82ebc980f996c78f9afb8a97418ab7ab
Parents: 1b02f88
Author: hyukjinkwon 
Authored: Wed Sep 28 06:19:04 2016 -0400
Committer: Sean Owen 
Committed: Wed Sep 28 06:19:18 2016 -0400

--
 python/pyspark/mllib/util.py| 8 
 python/pyspark/rdd.py   | 4 ++--
 python/pyspark/sql/dataframe.py | 2 +-
 python/pyspark/sql/streaming.py | 6 +++---
 4 files changed, 10 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d73d5cd/python/pyspark/mllib/util.py
--
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 48867a0..ed6fd4b 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -140,8 +140,8 @@ class MLUtils(object):
 >>> from pyspark.mllib.regression import LabeledPoint
 >>> from glob import glob
 >>> from pyspark.mllib.util import MLUtils
->>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])), \
-LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 
4.56)])),
+... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
 >>> tempFile = NamedTemporaryFile(delete=True)
 >>> tempFile.close()
 >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
@@ -166,8 +166,8 @@ 

spark git commit: [SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml/mllib feature selection docs for ChiSqSelector

2016-09-28 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4a8339568 -> b2a7eedcd


[SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml/mllib feature selection docs 
for ChiSqSelector

## What changes were proposed in this pull request?

A follow up for #14597 to update feature selection docs about ChiSqSelector.

## How was this patch tested?

Generated html docs. It can be previewed at:

* ml: http://sparkdocs.lins05.pw/spark-17017/ml-features.html#chisqselector
* mllib: 
http://sparkdocs.lins05.pw/spark-17017/mllib-feature-extraction.html#chisqselector

Author: Shuai Lin 

Closes #15236 from lins05/spark-17017-update-docs-for-chisq-selector-fpr.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2a7eedc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2a7eedc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2a7eedc

Branch: refs/heads/master
Commit: b2a7eedcddf0e682ff46afd1b764d0b81ccdf395
Parents: 4a83395
Author: Shuai Lin 
Authored: Wed Sep 28 06:12:48 2016 -0400
Committer: Sean Owen 
Committed: Wed Sep 28 06:12:48 2016 -0400

--
 docs/ml-features.md  | 14 ++
 docs/mllib-feature-extraction.md | 14 ++
 2 files changed, 20 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b2a7eedc/docs/ml-features.md
--
diff --git a/docs/ml-features.md b/docs/ml-features.md
index a39b31c..a7f710f 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1331,10 +1331,16 @@ for more details on the API.
 ## ChiSqSelector
 
 `ChiSqSelector` stands for Chi-Squared feature selection. It operates on 
labeled data with
-categorical features. ChiSqSelector orders features based on a
-[Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test)
-from the class, and then filters (selects) the top features which the class 
label depends on the
-most. This is akin to yielding the features with the most predictive power.
+categorical features. ChiSqSelector uses the
+[Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which
+features to choose. It supports three selection methods: `KBest`, `Percentile` 
and `FPR`:
+
+* `KBest` chooses the `k` top features according to a chi-squared test. This 
is akin to yielding the features with the most predictive power.
+* `Percentile` is similar to `KBest` but chooses a fraction of all features 
instead of a fixed number.
+* `FPR` chooses all features whose false positive rate meets some threshold.
+
+By default, the selection method is `KBest`, the default number of top 
features is 50. User can use
+`setNumTopFeatures`, `setPercentile` and `setAlpha` to set different selection 
methods.
 
 **Examples**
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b2a7eedc/docs/mllib-feature-extraction.md
--
diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md
index 353d391..87e1e02 100644
--- a/docs/mllib-feature-extraction.md
+++ b/docs/mllib-feature-extraction.md
@@ -225,10 +225,16 @@ features for use in model construction. It reduces the 
size of the feature space
 both speed and statistical learning behavior.
 
 
[`ChiSqSelector`](api/scala/index.html#org.apache.spark.mllib.feature.ChiSqSelector)
 implements
-Chi-Squared feature selection. It operates on labeled data with categorical 
features.
-`ChiSqSelector` orders features based on a Chi-Squared test of independence 
from the class,
-and then filters (selects) the top features which the class label depends on 
the most.
-This is akin to yielding the features with the most predictive power.
+Chi-Squared feature selection. It operates on labeled data with categorical 
features. ChiSqSelector uses the
+[Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which
+features to choose. It supports three selection methods: `KBest`, `Percentile` 
and `FPR`:
+
+* `KBest` chooses the `k` top features according to a chi-squared test. This 
is akin to yielding the features with the most predictive power.
+* `Percentile` is similar to `KBest` but chooses a fraction of all features 
instead of a fixed number.
+* `FPR` chooses all features whose false positive rate meets some threshold.
+
+By default, the selection method is `KBest`, the default number of top 
features is 50. User can use
+`setNumTopFeatures`, `setPercentile` and `setAlpha` to set different selection 
methods.
 
 The number of features to select can be tuned using a held-out validation set.
 


-
To 

spark git commit: [SPARK-17666] Ensure that RecordReaders are closed by data source file scans (backport)

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2cd327ef5 -> 1b02f8820


[SPARK-17666] Ensure that RecordReaders are closed by data source file scans 
(backport)

This is a branch-2.0 backport of #15245.

## What changes were proposed in this pull request?

This patch addresses a potential cause of resource leaks in data source file 
scans. As reported in 
[SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which 
do not fully-consume their input may cause file handles / network connections 
(e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext 
callback to [close its record 
readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208),
 but the new data source file scans will only close record readers once their 
iterators are fully-consumed.

This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add 
`close()` methods and modifies all six implementations of 
`FileFormat.buildReader()` to register TaskContext task completion callbacks to 
guarantee that cleanup is eventually performed.

## How was this patch tested?

Tested manually for now.

Author: Josh Rosen 

Closes #15271 from JoshRosen/SPARK-17666-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b02f882
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b02f882
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b02f882

Branch: refs/heads/branch-2.0
Commit: 1b02f8820ddaf3f2a0e7acc9a7f27afc20683cca
Parents: 2cd327e
Author: Josh Rosen 
Authored: Wed Sep 28 00:59:00 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 00:59:00 2016 -0700

--
 .../spark/ml/source/libsvm/LibSVMRelation.scala |  7 +--
 .../datasources/HadoopFileLinesReader.scala |  6 +-
 .../datasources/RecordReaderIterator.scala  | 21 ++--
 .../datasources/csv/CSVFileFormat.scala |  5 -
 .../datasources/json/JsonFileFormat.scala   |  5 -
 .../datasources/parquet/ParquetFileFormat.scala |  3 ++-
 .../datasources/text/TextFileFormat.scala   |  2 ++
 .../spark/sql/hive/orc/OrcFileFormat.scala  |  6 +-
 8 files changed, 46 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 034223e..ac95b92 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
+import org.apache.spark.TaskContext
 import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.mllib.util.MLUtils
@@ -160,8 +161,10 @@ private[libsvm] class LibSVMFileFormat extends 
TextBasedFileFormat with DataSour
   sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
 (file: PartitionedFile) => {
-  val points =
-new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+  val linesReader = new HadoopFileLinesReader(file, 
broadcastedHadoopConf.value.value)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
linesReader.close()))
+
+  val points = linesReader
   .map(_.toString.trim)
   .filterNot(line => line.isEmpty || line.startsWith("#"))
   .map { line =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index 18f9b55..83cf26c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.Closeable
 import java.net.URI
 
 import org.apache.hadoop.conf.Configuration