spark git commit: [SPARK-24997][SQL] Enable support of MINUS ALL

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master b0d6967d4 -> 19a453191


[SPARK-24997][SQL] Enable support of MINUS ALL

## What changes were proposed in this pull request?
Enable support for MINUS ALL which was gated at AstBuilder.

## How was this patch tested?
Added tests in SQLQueryTestSuite and modify PlanParserSuite.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Dilip Biswal 

Closes #21963 from dilipbiswal/minus-all.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19a45319
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19a45319
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19a45319

Branch: refs/heads/master
Commit: 19a45319130d618a173f5f3b4dde59356b39089b
Parents: b0d6967
Author: Dilip Biswal 
Authored: Thu Aug 2 22:45:10 2018 -0700
Committer: Xiao Li 
Committed: Thu Aug 2 22:45:10 2018 -0700

--
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  11 +-
 .../sql/catalyst/parser/PlanParserSuite.scala   |   4 +-
 .../resources/sql-tests/inputs/except-all.sql   |  22 ++-
 .../sql-tests/results/except-all.sql.out| 147 +++
 4 files changed, 113 insertions(+), 71 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/19a45319/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 0ceeb53..9906a30 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -517,11 +517,10 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
* Connect two queries by a Set operator.
*
* Supported Set operators are:
-   * - UNION [DISTINCT]
-   * - UNION ALL
-   * - EXCEPT [DISTINCT]
-   * - MINUS [DISTINCT]
-   * - INTERSECT [DISTINCT]
+   * - UNION [ DISTINCT | ALL ]
+   * - EXCEPT [ DISTINCT | ALL ]
+   * - MINUS [ DISTINCT | ALL ]
+   * - INTERSECT [DISTINCT | ALL]
*/
   override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = 
withOrigin(ctx) {
 val left = plan(ctx.left)
@@ -541,7 +540,7 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
   case SqlBaseParser.EXCEPT =>
 Except(left, right)
   case SqlBaseParser.SETMINUS if all =>
-throw new ParseException("MINUS ALL is not supported.", ctx)
+Except(left, right, isAll = true)
   case SqlBaseParser.SETMINUS =>
 Except(left, right)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/19a45319/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 38efd89..9247004 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -67,11 +67,13 @@ class PlanParserSuite extends AnalysisTest {
 assertEqual("select * from a union all select * from b", a.union(b))
 assertEqual("select * from a except select * from b", a.except(b))
 assertEqual("select * from a except distinct select * from b", a.except(b))
+assertEqual("select * from a except all select * from b", a.except(b, 
isAll = true))
 assertEqual("select * from a minus select * from b", a.except(b))
-intercept("select * from a minus all select * from b", "MINUS ALL is not 
supported.")
+assertEqual("select * from a minus all select * from b", a.except(b, isAll 
= true))
 assertEqual("select * from a minus distinct select * from b", a.except(b))
 assertEqual("select * from a intersect select * from b", a.intersect(b))
 assertEqual("select * from a intersect distinct select * from b", 
a.intersect(b))
+assertEqual("select * from a intersect all select * from b", 
a.intersect(b, isAll = true))
   }
 
   test("common table expressions") {

http://git-wip-us.apache.org/repos/asf/spark/blob/19a45319/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql 
b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
index 08b9a43..e28f072 

spark git commit: [SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs should not fail

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f45d60a5a -> b0d6967d4


[SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs 
should not fail

## What changes were proposed in this pull request?
In the current master, `toString` throws an exception when 
`RelationalGroupedDataset` has unresolved expressions;
```
scala> spark.range(0, 10).groupBy("id")
res4: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: 
[grouping expressions: [id: bigint], value: [id: bigint], type: GroupBy]

scala> spark.range(0, 10).groupBy('id)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: 'id
  at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
  at 
org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:474)
  at 
org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:473)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.spark.sql.RelationalGroupedDataset.toString(RelationalGroupedDataset.scala:473)
  at 
scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
```
This pr fixed code to handle the unresolved case in 
`RelationalGroupedDataset.toString`.

Closes #21752

## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.

Author: Chris Horn 
Author: Takeshi Yamamuro 

Closes #21964 from maropu/SPARK-24788.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0d6967d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0d6967d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0d6967d

Branch: refs/heads/master
Commit: b0d6967d45f3260ed4ee9b2a49f801d799e81283
Parents: f45d60a
Author: Chris Horn 
Authored: Thu Aug 2 22:40:58 2018 -0700
Committer: Xiao Li 
Committed: Thu Aug 2 22:40:58 2018 -0700

--
 .../org/apache/spark/sql/RelationalGroupedDataset.scala   |  7 +--
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 10 ++
 2 files changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b0d6967d/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 8412219..4e73b36 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -469,8 +469,11 @@ class RelationalGroupedDataset protected[sql](
   override def toString: String = {
 val builder = new StringBuilder
 builder.append("RelationalGroupedDataset: [grouping expressions: [")
-val kFields = groupingExprs.map(_.asInstanceOf[NamedExpression]).map {
-  case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+val kFields = groupingExprs.collect {
+  case expr: NamedExpression if expr.resolved =>
+s"${expr.name}: ${expr.dataType.simpleString(2)}"
+  case expr: NamedExpression => expr.name
+  case o => o.toString
 }
 builder.append(kFields.take(2).mkString(", "))
 if (kFields.length > 2) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d6967d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index f495a94..d0106c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -717,4 +717,14 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   Row(1, 2, 1) :: Row(2, 2, 2) :: Row(3, 2, 3) :: Nil)
   }
 
+  test("SPARK-24788: RelationalGroupedDataset.toString with unresolved exprs 
should not fail") {
+// Checks if these raise no 

spark git commit: [SPARK-25002][SQL] Avro: revise the output record namespace

2018-08-02 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 73dd6cf9b -> f45d60a5a


[SPARK-25002][SQL] Avro: revise the output record namespace

## What changes were proposed in this pull request?

Currently the output namespace is starting with ".", e.g. `.topLevelRecord`

Although it is valid according to Avro spec, we should remove the starting dot 
in case of failures when the output Avro file is read by other lib:

https://github.com/linkedin/goavro/pull/96

## How was this patch tested?

Unit test

Author: Gengliang Wang 

Closes #21974 from gengliangwang/avro_namespace.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f45d60a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f45d60a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f45d60a5

Branch: refs/heads/master
Commit: f45d60a5a1f1e97ecde36eda8202034d78f93d53
Parents: 73dd6cf
Author: Gengliang Wang 
Authored: Fri Aug 3 13:28:44 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Aug 3 13:28:44 2018 +0800

--
 .../apache/spark/sql/avro/SchemaConverters.scala   |  6 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 17 +
 2 files changed, 22 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f45d60a5/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
--
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 1e91207..6929539 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -142,7 +142,11 @@ object SchemaConverters {
 builder.map()
   .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace, 
outputTimestampType))
   case st: StructType =>
-val nameSpace = s"$prevNameSpace.$recordName"
+val nameSpace = prevNameSpace match {
+  case "" => recordName
+  case _ => s"$prevNameSpace.$recordName"
+}
+
 val fieldsAssembler = 
builder.record(recordName).namespace(nameSpace).fields()
 st.foreach { f =>
   val fieldAvroType =

http://git-wip-us.apache.org/repos/asf/spark/blob/f45d60a5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
--
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 085c8c8..b4dcf6c 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -884,6 +884,23 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
 
   case class NestedTop(id: Int, data: NestedMiddle)
 
+  test("Validate namespace in avro file that has nested records with the same 
name") {
+withTempPath { dir =>
+  val writeDf = spark.createDataFrame(List(NestedTop(1, NestedMiddle(2, 
NestedBottom(3, "1")
+  writeDf.write.format("avro").save(dir.toString)
+  val file = new File(dir.toString)
+.listFiles()
+.filter(_.isFile)
+.filter(_.getName.endsWith("avro"))
+.head
+  val reader = new DataFileReader(file, new GenericDatumReader[Any]())
+  val schema = reader.getSchema.toString()
+  assert(schema.contains("\"namespace\":\"topLevelRecord\""))
+  assert(schema.contains("\"namespace\":\"topLevelRecord.data\""))
+  assert(schema.contains("\"namespace\":\"topLevelRecord.data.data\""))
+}
+  }
+
   test("saving avro that has nested records with the same name") {
 withTempPath { tempDir =>
   // Save avro file on output folder path


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28523 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_02_22_01-8080c93-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-02 Thread pwendell
Author: pwendell
Date: Fri Aug  3 05:15:41 2018
New Revision: 28523

Log:
Apache Spark 2.3.3-SNAPSHOT-2018_08_02_22_01-8080c93 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24966][SQL] Implement precedence rules for set operations.

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master b3f2911ee -> 73dd6cf9b


[SPARK-24966][SQL] Implement precedence rules for set operations.

## What changes were proposed in this pull request?

Currently the set operations INTERSECT, UNION and EXCEPT are assigned the same 
precedence. This PR fixes the problem by giving INTERSECT  higher precedence 
than UNION and EXCEPT. UNION and EXCEPT operators are evaluated in the order in 
which they appear in the query from left to right.

This results in change in behavior because of the change in order of 
evaluations of set operators in a query. The old behavior is still preserved 
under a newly added config parameter.

Query `:`
```
SELECT * FROM t1
UNION
SELECT * FROM t2
EXCEPT
SELECT * FROM t3
INTERSECT
SELECT * FROM t4
```
Parsed plan before the change `:`
```
== Parsed Logical Plan ==
'Intersect false
:- 'Except false
:  :- 'Distinct
:  :  +- 'Union
:  : :- 'Project [*]
:  : :  +- 'UnresolvedRelation `t1`
:  : +- 'Project [*]
:  :+- 'UnresolvedRelation `t2`
:  +- 'Project [*]
: +- 'UnresolvedRelation `t3`
+- 'Project [*]
   +- 'UnresolvedRelation `t4`
```
Parsed plan after the change `:`
```
== Parsed Logical Plan ==
'Except false
:- 'Distinct
:  +- 'Union
: :- 'Project [*]
: :  +- 'UnresolvedRelation `t1`
: +- 'Project [*]
:+- 'UnresolvedRelation `t2`
+- 'Intersect false
   :- 'Project [*]
   :  +- 'UnresolvedRelation `t3`
   +- 'Project [*]
  +- 'UnresolvedRelation `t4`
```
## How was this patch tested?
Added tests in PlanParserSuite, SQLQueryTestSuite.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Dilip Biswal 

Closes #21941 from dilipbiswal/SPARK-24966.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73dd6cf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73dd6cf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73dd6cf9

Branch: refs/heads/master
Commit: 73dd6cf9b558f9d752e1f3c13584344257ad7863
Parents: b3f2911
Author: Dilip Biswal 
Authored: Thu Aug 2 22:04:17 2018 -0700
Committer: Xiao Li 
Committed: Thu Aug 2 22:04:17 2018 -0700

--
 docs/sql-programming-guide.md   |   1 +
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  15 ++-
 .../apache/spark/sql/catalyst/dsl/package.scala |   6 +-
 .../spark/sql/catalyst/parser/ParseDriver.scala |   2 +
 .../plans/logical/basicLogicalOperators.scala   |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  12 +++
 .../sql/catalyst/parser/PlanParserSuite.scala   |  45 
 .../spark/sql/execution/SparkStrategies.scala   |   4 +-
 .../sql-tests/inputs/intersect-all.sql  |  51 +++--
 .../sql-tests/results/intersect-all.sql.out | 104 +++
 10 files changed, 211 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73dd6cf9/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 0900f83..a1e019c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1876,6 +1876,7 @@ working with timestamps in `pandas_udf`s to get the best 
performance, see
 
 ## Upgrading From Spark SQL 2.3 to 2.4
 
+  - Since Spark 2.4, Spark will evaluate the set operations referenced in a 
query by following a precedence rule as per the SQL standard. If the order is 
not specified by parentheses, set operations are performed from left to right 
with the exception that all INTERSECT operations are performed before any 
UNION, EXCEPT or MINUS operations. The old behaviour of giving equal precedence 
to all the set operations are preserved under a newly added configuaration 
`spark.sql.legacy.setopsPrecedence.enabled` with a default value of `false`. 
When this property is set to `true`, spark will evaluate the set operators from 
left to right as they appear in the query given no explicit ordering is 
enforced by usage of parenthesis.
   - Since Spark 2.4, Spark will display table description column Last Access 
value as UNKNOWN when the value was Jan 01 1970.
   - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for 
ORC files by default. To do that, `spark.sql.orc.impl` and 
`spark.sql.orc.filterPushdown` change their default values to `native` and 
`true` respectively.
   - In PySpark, when Arrow optimization is enabled, previously `toPandas` just 
failed when Arrow optimization is unable to be used whereas `createDataFrame` 
from Pandas DataFrame allowed the fallback to non-optimization. Now, both 
`toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by 
default, which can be switched off by 

spark git commit: [PYSPARK] Updates to Accumulators

2018-08-02 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 5b187a85a -> 8080c937d


[PYSPARK] Updates to Accumulators

(cherry picked from commit 15fc2372269159ea2556b028d4eb8860c4108650)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8080c937
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8080c937
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8080c937

Branch: refs/heads/branch-2.3
Commit: 8080c937d3752aee2fd36f0045a057f7130f6fe4
Parents: 5b187a8
Author: LucaCanali 
Authored: Wed Jul 18 23:19:02 2018 +0200
Committer: Imran Rashid 
Committed: Thu Aug 2 21:05:03 2018 -0500

--
 .../org/apache/spark/api/python/PythonRDD.scala | 12 +++--
 python/pyspark/accumulators.py  | 53 +++-
 python/pyspark/context.py   |  5 +-
 3 files changed, 53 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8080c937/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index a1ee2f7..8bc0ff7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -586,8 +586,9 @@ class BytesToString extends 
org.apache.spark.api.java.function.Function[Array[By
  */
 private[spark] class PythonAccumulatorV2(
 @transient private val serverHost: String,
-private val serverPort: Int)
-  extends CollectionAccumulator[Array[Byte]] {
+private val serverPort: Int,
+private val secretToken: String)
+  extends CollectionAccumulator[Array[Byte]] with Logging{
 
   Utils.checkHost(serverHost)
 
@@ -602,12 +603,17 @@ private[spark] class PythonAccumulatorV2(
   private def openSocket(): Socket = synchronized {
 if (socket == null || socket.isClosed) {
   socket = new Socket(serverHost, serverPort)
+  logInfo(s"Connected to AccumulatorServer at host: $serverHost port: 
$serverPort")
+  // send the secret just for the initial authentication when opening a 
new connection
+  
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
 }
 socket
   }
 
   // Need to override so the types match with PythonFunction
-  override def copyAndReset(): PythonAccumulatorV2 = new 
PythonAccumulatorV2(serverHost, serverPort)
+  override def copyAndReset(): PythonAccumulatorV2 = {
+new PythonAccumulatorV2(serverHost, serverPort, secretToken)
+  }
 
   override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): 
Unit = synchronized {
 val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2]

http://git-wip-us.apache.org/repos/asf/spark/blob/8080c937/python/pyspark/accumulators.py
--
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 6ef8cf5..bc0be07 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -228,20 +228,49 @@ class 
_UpdateRequestHandler(SocketServer.StreamRequestHandler):
 
 def handle(self):
 from pyspark.accumulators import _accumulatorRegistry
-while not self.server.server_shutdown:
-# Poll every 1 second for new data -- don't block in case of 
shutdown.
-r, _, _ = select.select([self.rfile], [], [], 1)
-if self.rfile in r:
-num_updates = read_int(self.rfile)
-for _ in range(num_updates):
-(aid, update) = pickleSer._read_with_length(self.rfile)
-_accumulatorRegistry[aid] += update
-# Write a byte in acknowledgement
-self.wfile.write(struct.pack("!b", 1))
+auth_token = self.server.auth_token
+
+def poll(func):
+while not self.server.server_shutdown:
+# Poll every 1 second for new data -- don't block in case of 
shutdown.
+r, _, _ = select.select([self.rfile], [], [], 1)
+if self.rfile in r:
+if func():
+break
+
+def accum_updates():
+num_updates = read_int(self.rfile)
+for _ in range(num_updates):
+(aid, update) = pickleSer._read_with_length(self.rfile)
+_accumulatorRegistry[aid] += update
+# Write a byte in acknowledgement
+self.wfile.write(struct.pack("!b", 1))
+return False
+
+def authenticate_and_accum_updates():
+received_token = self.rfile.read(len(auth_token))
+if isinstance(received_token, bytes):
+  

svn commit: r28522 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_20_01-b3f2911-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-02 Thread pwendell
Author: pwendell
Date: Fri Aug  3 03:15:49 2018
New Revision: 28522

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_02_20_01-b3f2911 docs


[This commit notification would consist of 1470 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24945][SQL] Switching to uniVocity 2.7.3

2018-08-02 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 7cf16a7fa -> b3f2911ee


[SPARK-24945][SQL] Switching to uniVocity 2.7.3

## What changes were proposed in this pull request?

In the PR, I propose to upgrade uniVocity parser from **2.6.3** to **2.7.3**. 
The recent version includes a fix for the SPARK-24645 issue and has better 
performance.

Before changes:
```
Parsing quoted values:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

One quoted string   6 / 34122  0.0  
666727.0   1.0X

Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

Select 1000 columns 90287 / 91713  0.0   
90286.9   1.0X
Select 100 columns  31826 / 36589  0.0   
31826.4   2.8X
Select one column   25738 / 25872  0.0   
25737.9   3.5X
count()   6931 / 7269  0.1
6931.5  13.0X
```
after:
```
Parsing quoted values:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

One quoted string   33411 / 33510  0.0  
668211.4   1.0X

Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

Select 1000 columns 88028 / 89311  0.0   
88028.1   1.0X
Select 100 columns  29010 / 32755  0.0   
29010.1   3.0X
Select one column   22936 / 22953  0.0   
22936.5   3.8X
count()   6657 / 6740  0.2
6656.6  13.5X
```
Closes #21892

## How was this patch tested?

It was tested by `CSVSuite` and `CSVBenchmarks`

Author: Maxim Gekk 

Closes #21969 from MaxGekk/univocity-2_7_3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3f2911e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3f2911e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3f2911e

Branch: refs/heads/master
Commit: b3f2911eebeb418631ce296f68a7cc68083659cd
Parents: 7cf16a7
Author: Maxim Gekk 
Authored: Fri Aug 3 08:33:28 2018 +0800
Committer: hyukjinkwon 
Committed: Fri Aug 3 08:33:28 2018 +0800

--
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 dev/deps/spark-deps-hadoop-3.1 | 2 +-
 sql/core/pom.xml   | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 4ef61b2..54cdcfc 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -191,7 +191,7 @@ stax-api-1.0.1.jar
 stream-2.7.0.jar
 stringtemplate-3.2.1.jar
 super-csv-2.2.0.jar
-univocity-parsers-2.6.3.jar
+univocity-parsers-2.7.3.jar
 validation-api-1.1.0.Final.jar
 xbean-asm6-shaded-4.8.jar
 xercesImpl-2.9.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index a74ce1f..fda13db 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -192,7 +192,7 @@ stax-api-1.0.1.jar
 stream-2.7.0.jar
 stringtemplate-3.2.1.jar
 super-csv-2.2.0.jar
-univocity-parsers-2.6.3.jar
+univocity-parsers-2.7.3.jar
 validation-api-1.1.0.Final.jar
 xbean-asm6-shaded-4.8.jar
 xercesImpl-2.9.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/dev/deps/spark-deps-hadoop-3.1
--
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index e0fcca0..90602fc 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -212,7 +212,7 @@ stream-2.7.0.jar
 stringtemplate-3.2.1.jar
 super-csv-2.2.0.jar
 token-provider-1.0.1.jar
-univocity-parsers-2.6.3.jar
+univocity-parsers-2.7.3.jar
 validation-api-1.1.0.Final.jar
 woodstox-core-5.0.3.jar
 xbean-asm6-shaded-4.8.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/sql/core/pom.xml

spark git commit: [SPARK-24773] Avro: support logical timestamp type with different precisions

2018-08-02 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 29077a1d1 -> 7cf16a7fa


[SPARK-24773] Avro: support logical timestamp type with different precisions

## What changes were proposed in this pull request?

Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29

To specify the output timestamp type, use Dataframe option 
`outputTimestampType`  or SQL config `spark.sql.avro.outputTimestampType`.  The 
supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`

The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?

Unit test

Author: Gengliang Wang 

Closes #21935 from gengliangwang/avro_timestamp.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7cf16a7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7cf16a7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7cf16a7f

Branch: refs/heads/master
Commit: 7cf16a7fa4eb4145c0c5d1dd2555f78a2fdd8d8b
Parents: 29077a1
Author: Gengliang Wang 
Authored: Fri Aug 3 08:32:08 2018 +0800
Committer: hyukjinkwon 
Committed: Fri Aug 3 08:32:08 2018 +0800

--
 .../spark/sql/avro/AvroDeserializer.scala   |  15 ++-
 .../apache/spark/sql/avro/AvroFileFormat.scala  |   4 +-
 .../org/apache/spark/sql/avro/AvroOptions.scala |  11 ++
 .../apache/spark/sql/avro/AvroSerializer.scala  |  12 ++-
 .../spark/sql/avro/SchemaConverters.scala   |  33 --
 external/avro/src/test/resources/timestamp.avro | Bin 0 -> 375 bytes
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 107 +--
 .../org/apache/spark/sql/internal/SQLConf.scala |  18 
 8 files changed, 178 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7cf16a7f/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
--
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index b31149a..394a62b 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
 import org.apache.avro.Schema.Type._
 import org.apache.avro.generic._
 import org.apache.avro.util.Utf8
@@ -86,8 +87,18 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
   case (LONG, LongType) => (updater, ordinal, value) =>
 updater.setLong(ordinal, value.asInstanceOf[Long])
 
-  case (LONG, TimestampType) => (updater, ordinal, value) =>
-updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+  case (LONG, TimestampType) => avroType.getLogicalType match {
+case _: TimestampMillis => (updater, ordinal, value) =>
+  updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+case _: TimestampMicros => (updater, ordinal, value) =>
+  updater.setLong(ordinal, value.asInstanceOf[Long])
+case null => (updater, ordinal, value) =>
+  // For backward compatibility, if the Avro type is Long and it is 
not logical type,
+  // the value is processed as timestamp type with millisecond 
precision.
+  updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+case other => throw new IncompatibleSchemaException(
+  s"Cannot convert Avro logical type ${other} to Catalyst Timestamp 
type.")
+  }
 
   case (LONG, DateType) => (updater, ordinal, value) =>
 updater.setInt(ordinal, (value.asInstanceOf[Long] / 
DateTimeUtils.MILLIS_PER_DAY).toInt)

http://git-wip-us.apache.org/repos/asf/spark/blob/7cf16a7f/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
--
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 6776516..6ffcf37 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -113,8 +113,8 @@ private[avro] class AvroFileFormat extends FileFormat
   options: Map[String, String],
   dataSchema: StructType): OutputWriterFactory = {
 val parsedOptions = new AvroOptions(options, 
spark.sessionState.newHadoopConf())
-val outputAvroSchema = 

spark git commit: [SPARK-24795][CORE][FOLLOWUP] Combine BarrierTaskContext with BarrierTaskContextImpl

2018-08-02 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master bbdcc3bf6 -> 29077a1d1


[SPARK-24795][CORE][FOLLOWUP] Combine BarrierTaskContext with 
BarrierTaskContextImpl

## What changes were proposed in this pull request?

According to https://github.com/apache/spark/pull/21758#discussion_r206746905 , 
current declaration of `BarrierTaskContext` didn't extend methods from 
`TaskContext`. Since `TaskContext` is an abstract class and we don't want to 
change it to a trait, we have to define class `BarrierTaskContext` directly.

## How was this patch tested?

Existing tests.

Author: Xingbo Jiang 

Closes #21972 from jiangxb1987/BarrierTaskContext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29077a1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29077a1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29077a1d

Branch: refs/heads/master
Commit: 29077a1d15e49dfafe7f2eab963830ba9cc6b29a
Parents: bbdcc3b
Author: Xingbo Jiang 
Authored: Thu Aug 2 17:19:42 2018 -0700
Committer: Xiangrui Meng 
Committed: Thu Aug 2 17:19:42 2018 -0700

--
 .../org/apache/spark/BarrierTaskContext.scala   | 60 +++-
 .../apache/spark/BarrierTaskContextImpl.scala   | 49 
 .../scala/org/apache/spark/rdd/RDDBarrier.scala |  2 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  2 +-
 4 files changed, 59 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/29077a1d/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 4c35862..ba30368 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -17,20 +17,71 @@
 
 package org.apache.spark
 
+import java.util.Properties
+
 import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.metrics.MetricsSystem
 
 /** A [[TaskContext]] with extra info and tooling for a barrier stage. */
-trait BarrierTaskContext extends TaskContext {
+class BarrierTaskContext(
+override val stageId: Int,
+override val stageAttemptNumber: Int,
+override val partitionId: Int,
+override val taskAttemptId: Long,
+override val attemptNumber: Int,
+override val taskMemoryManager: TaskMemoryManager,
+localProperties: Properties,
+@transient private val metricsSystem: MetricsSystem,
+// The default value is only used in tests.
+override val taskMetrics: TaskMetrics = TaskMetrics.empty)
+  extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, 
taskAttemptId, attemptNumber,
+  taskMemoryManager, localProperties, metricsSystem, taskMetrics) {
 
   /**
* :: Experimental ::
* Sets a global barrier and waits until all tasks in this stage hit this 
barrier. Similar to
* MPI_Barrier function in MPI, the barrier() function call blocks until all 
tasks in the same
* stage have reached this routine.
+   *
+   * CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
+   * possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
+   * timeout. Some examples of misuses listed below:
+   * 1. Only call barrier() function on a subset of all the tasks in the same 
barrier stage, it
+   * shall lead to timeout of the function call.
+   * {{{
+   *   rdd.barrier().mapPartitions { (iter, context) =>
+   *   if (context.partitionId() == 0) {
+   *   // Do nothing.
+   *   } else {
+   *   context.barrier()
+   *   }
+   *   iter
+   *   }
+   * }}}
+   *
+   * 2. Include barrier() function in a try-catch code block, this may lead to 
timeout of the
+   * second function call.
+   * {{{
+   *   rdd.barrier().mapPartitions { (iter, context) =>
+   *   try {
+   *   // Do something that might throw an Exception.
+   *   doSomething()
+   *   context.barrier()
+   *   } catch {
+   *   case e: Exception => logWarning("...", e)
+   *   }
+   *   context.barrier()
+   *   iter
+   *   }
+   * }}}
*/
   @Experimental
   @Since("2.4.0")
-  def barrier(): Unit
+  def barrier(): Unit = {
+// TODO SPARK-24817 implement global barrier.
+  }
 
   /**
* :: Experimental ::
@@ -38,5 +89,8 @@ trait BarrierTaskContext extends TaskContext {
*/
   @Experimental
   @Since("2.4.0")
-  def getTaskInfos(): Array[BarrierTaskInfo]
+  def getTaskInfos(): Array[BarrierTaskInfo] = {
+val addressesStr = 

spark git commit: [SPARK-22219][SQL] Refactor code to get a value for "spark.sql.codegen.comments"

2018-08-02 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d0bc3ed67 -> bbdcc3bf6


[SPARK-22219][SQL] Refactor code to get a value for "spark.sql.codegen.comments"

## What changes were proposed in this pull request?

This PR refactors code to get a value for "spark.sql.codegen.comments" by 
avoiding `SparkEnv.get.conf`. This PR uses `SQLConf.get.codegenComments` since 
`SQLConf.get` always returns an instance of `SQLConf`.

## How was this patch tested?

Added test case to `DebuggingSuite`

Author: Kazuaki Ishizaki 

Closes #19449 from kiszk/SPARK-22219.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbdcc3bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbdcc3bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbdcc3bf

Branch: refs/heads/master
Commit: bbdcc3bf61da39704650d4570c6307b5a46f7100
Parents: d0bc3ed
Author: Kazuaki Ishizaki 
Authored: Thu Aug 2 18:19:04 2018 -0500
Committer: Sean Owen 
Committed: Thu Aug 2 18:19:04 2018 -0500

--
 .../expressions/codegen/CodeGenerator.scala |  7 +--
 .../org/apache/spark/sql/internal/SQLConf.scala |  2 ++
 .../apache/spark/sql/internal/StaticSQLConf.scala   |  8 
 .../sql/internal/ExecutorSideSQLConfSuite.scala | 16 
 4 files changed, 27 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bbdcc3bf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 05500f5..498dd26 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -1173,12 +1173,7 @@ class CodegenContext {
text: => String,
placeholderId: String = "",
force: Boolean = false): Block = {
-// By default, disable comments in generated code because computing the 
comments themselves can
-// be extremely expensive in certain cases, such as deeply-nested 
expressions which operate over
-// inputs with wide schemas. For more details on the performance issues 
that motivated this
-// flat, see SPARK-15680.
-if (force ||
-  SparkEnv.get != null && 
SparkEnv.get.conf.getBoolean("spark.sql.codegen.comments", false)) {
+if (force || SQLConf.get.codegenComments) {
   val name = if (placeholderId != "") {
 assert(!placeHolderToComments.contains(placeholderId))
 placeholderId

http://git-wip-us.apache.org/repos/asf/spark/blob/bbdcc3bf/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index edc1a48..8f303f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1599,6 +1599,8 @@ class SQLConf extends Serializable with Logging {
 
   def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
 
+  def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS)
+
   def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)
 
   def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)

http://git-wip-us.apache.org/repos/asf/spark/blob/bbdcc3bf/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 384b191..d9c354b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -74,6 +74,14 @@ object StaticSQLConf {
   .checkValue(maxEntries => maxEntries >= 0, "The maximum must not be 
negative")
   .createWithDefault(100)
 
+  val CODEGEN_COMMENTS = buildStaticConf("spark.sql.codegen.comments")
+.internal()
+.doc("When true, put comment in the generated code. Since computing huge 
comments " +
+  "can be extremely expensive in certain cases, such as deeply-nested 
expressions which " +
+  "operate over inputs with wide schemas, default is false.")
+.booleanConf
+

svn commit: r28520 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_16_02-d0bc3ed-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-02 Thread pwendell
Author: pwendell
Date: Thu Aug  2 23:16:01 2018
New Revision: 28520

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_02_16_02-d0bc3ed docs


[This commit notification would consist of 1469 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query

2018-08-02 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master efef55388 -> d0bc3ed67


[SPARK-24896][SQL] Uuid should produce different values for each execution in 
streaming query

## What changes were proposed in this pull request?

`Uuid`'s results depend on random seed given during analysis. Thus under 
streaming query, we will have the same uuids in each execution. This seems to 
be incorrect for streaming query execution.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #21854 from viirya/uuid_in_streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0bc3ed6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0bc3ed6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0bc3ed6

Branch: refs/heads/master
Commit: d0bc3ed6797e0c06f688b7b2ef6c26282a25b175
Parents: efef553
Author: Liang-Chi Hsieh 
Authored: Thu Aug 2 15:35:46 2018 -0700
Committer: Shixiong Zhu 
Committed: Thu Aug 2 15:35:46 2018 -0700

--
 .../streaming/IncrementalExecution.scala|  8 ++-
 .../sql/streaming/StreamingQuerySuite.scala | 22 +++-
 2 files changed, 28 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 6ae7f28..e9ffe12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.util.Random
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
+import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, HashPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -73,10 +75,14 @@ class IncrementalExecution(
* with the desired literal
*/
   override lazy val optimizedPlan: LogicalPlan = {
+val random = new Random()
+
 sparkSession.sessionState.optimizer.execute(withCachedData) 
transformAllExpressions {
   case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
 logInfo(s"Current batch timestamp = $timestamp")
 ts.toLiteral
+  // SPARK-24896: Set the seed for random number generation in Uuid 
expressions.
+  case _: Uuid => Uuid(Some(random.nextLong()))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 78199b0..f37f368 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -21,6 +21,8 @@ import java.{util => ju}
 import java.util.Optional
 import java.util.concurrent.CountDownLatch
 
+import scala.collection.mutable
+
 import org.apache.commons.lang3.RandomStringUtils
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
@@ -29,8 +31,9 @@ import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Uuid
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
@@ -834,6 +837,23 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
   CheckLastBatch(("A", 1)))
   }
 
+  test("Uuid in streaming query should not produce same uuids in each 
execution") {
+val uuids = mutable.ArrayBuffer[String]()
+def collectUuid: Seq[Row] => Unit = { rows: Seq[Row] =>
+  rows.foreach(r 

spark git commit: [SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 02f967795 -> efef55388


[SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused

## What changes were proposed in this pull request?
In the current master, `EnsureRequirements` sets the number of exchanges in 
`ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes 
some duplicate exchange and the actual number of registered exchanges changes. 
Finally, the assertion in `ExchangeCoordinator` fails because the logical 
number of exchanges and the actual number of registered exchanges become 
different;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201

This pr fixed the issue and the code to reproduce this is as follows;
```
scala> sql("SET spark.sql.adaptive.enabled=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
scala> val df = spark.range(1).selectExpr("id AS key", "id AS value")
scala> val resultDf = df.join(df, "key").join(df, "key")
scala> resultDf.show
...
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 101 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
...
```

## How was this patch tested?
Added tests in `ExchangeCoordinatorSuite`.

Author: Takeshi Yamamuro 

Closes #21754 from maropu/SPARK-24705-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efef5538
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efef5538
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efef5538

Branch: refs/heads/master
Commit: efef55388fedef3f7954a385776e666ad4597a58
Parents: 02f9677
Author: Takeshi Yamamuro 
Authored: Thu Aug 2 13:05:36 2018 -0700
Committer: Xiao Li 
Committed: Thu Aug 2 13:05:36 2018 -0700

--
 .../execution/exchange/EnsureRequirements.scala |  1 -
 .../exchange/ExchangeCoordinator.scala  | 17 ++--
 .../execution/ExchangeCoordinatorSuite.scala| 21 
 3 files changed, 28 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index d96ecba..d2d5011 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -82,7 +82,6 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
   if (adaptiveExecutionEnabled && supportsCoordinator) {
 val coordinator =
   new ExchangeCoordinator(
-children.length,
 targetPostShuffleInputSize,
 minNumPostShufflePartitions)
 children.zip(requiredChildDistributions).map {

http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
index 051e610..f5d93ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -83,7 +83,6 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, 
SparkPlan}
  *  - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB)
  */
 class ExchangeCoordinator(
-numExchanges: Int,
 advisoryTargetPostShuffleInputSize: Long,
 minNumPostShufflePartitions: Option[Int] = None)
   extends Logging {
@@ 

spark git commit: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 0df6bf882 -> 02f967795


[SPARK-23908][SQL] Add transform function.

## What changes were proposed in this pull request?

This pr adds `transform` function which transforms elements in an array using 
the function.
Optionally we can take the index of each element as the second argument.

```sql
> SELECT transform(array(1, 2, 3), x -> x + 1);
 array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
 array(1, 3, 5)
```

## How was this patch tested?

Added tests.

Author: Takuya UESHIN 

Closes #21954 from ueshin/issues/SPARK-23908/transform.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02f96779
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02f96779
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02f96779

Branch: refs/heads/master
Commit: 02f967795b7e8ccf2738d567928e47c38c1134e1
Parents: 0df6bf8
Author: Takuya UESHIN 
Authored: Thu Aug 2 13:00:33 2018 -0700
Committer: Xiao Li 
Committed: Thu Aug 2 13:00:33 2018 -0700

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   2 +
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   3 +
 .../catalyst/analysis/FunctionRegistry.scala|   1 +
 .../analysis/higherOrderFunctions.scala | 166 +++
 .../expressions/higherOrderFunctions.scala  | 212 +++
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  10 +
 .../analysis/ResolveLambdaVariablesSuite.scala  |  89 
 .../expressions/HigherOrderFunctionsSuite.scala |  97 +
 .../catalyst/parser/ExpressionParserSuite.scala |   5 +
 .../spark/sql/catalyst/plans/PlanTest.scala |   2 +
 .../sql-tests/inputs/higher-order-functions.sql |  26 +++
 .../results/higher-order-functions.sql.out  |  81 +++
 .../spark/sql/DataFrameFunctionsSuite.scala | 153 +
 13 files changed, 847 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/02f96779/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 2aca10f..9ad6f30 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -591,6 +591,8 @@ primaryExpression
(OVER windowSpec)?  
#functionCall
 | qualifiedName '(' trimOption=(BOTH | LEADING | TRAILING) 
argument+=expression
   FROM argument+=expression ')'
#functionCall
+| IDENTIFIER '->' expression   
#lambda
+| '(' IDENTIFIER (',' IDENTIFIER)+ ')' '->' expression 
#lambda
 | value=primaryExpression '[' index=valueExpression ']'
#subscript
 | identifier   
#columnReference
 | base=primaryExpression '.' fieldName=identifier  
#dereference

http://git-wip-us.apache.org/repos/asf/spark/blob/02f96779/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 76dc867..7f235ac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -180,6 +180,8 @@ class Analyzer(
   ResolveAggregateFunctions ::
   TimeWindowing ::
   ResolveInlineTables(conf) ::
+  ResolveHigherOrderFunctions(catalog) ::
+  ResolveLambdaVariables(conf) ::
   ResolveTimeZone(conf) ::
   ResolveRandomSeed ::
   TypeCoercion.typeCoercionRules(conf) ++
@@ -878,6 +880,7 @@ class Analyzer(
 }
 
 private def resolve(e: Expression, q: LogicalPlan): Expression = e match {
+  case f: LambdaFunction if !f.bound => f
   case u @ UnresolvedAttribute(nameParts) =>
 // Leave unchanged if resolution fails. Hopefully will be resolved 
next round.
 val result =

http://git-wip-us.apache.org/repos/asf/spark/blob/02f96779/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

svn commit: r28518 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_12_02-0df6bf8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-02 Thread pwendell
Author: pwendell
Date: Thu Aug  2 19:16:55 2018
New Revision: 28518

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_02_12_02-0df6bf8 docs


[This commit notification would consist of 1469 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [BUILD] Fix lint-python.

2018-08-02 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master 38e4699c9 -> 0df6bf882


[BUILD] Fix lint-python.

## What changes were proposed in this pull request?

This pr fixes lint-python.

```
./python/pyspark/accumulators.py:231:9: E306 expected 1 blank line before a 
nested definition, found 0
./python/pyspark/accumulators.py:257:101: E501 line too long (107 > 100 
characters)
./python/pyspark/accumulators.py:264:1: E302 expected 2 blank lines, found 1
./python/pyspark/accumulators.py:281:1: E302 expected 2 blank lines, found 1
```

## How was this patch tested?

Executed lint-python manually.

Author: Takuya UESHIN 

Closes #21973 from ueshin/issues/build/1/fix_lint-python.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6bf88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6bf88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6bf88

Branch: refs/heads/master
Commit: 0df6bf882907d7d76572f513168a144067d0e0ec
Parents: 38e4699
Author: Takuya UESHIN 
Authored: Fri Aug 3 03:18:46 2018 +0900
Committer: Takuya UESHIN 
Committed: Fri Aug 3 03:18:46 2018 +0900

--
 python/pyspark/accumulators.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0df6bf88/python/pyspark/accumulators.py
--
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 1276c31..30ad042 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -228,6 +228,7 @@ class 
_UpdateRequestHandler(SocketServer.StreamRequestHandler):
 def handle(self):
 from pyspark.accumulators import _accumulatorRegistry
 auth_token = self.server.auth_token
+
 def poll(func):
 while not self.server.server_shutdown:
 # Poll every 1 second for new data -- don't block in case of 
shutdown.
@@ -254,13 +255,15 @@ class 
_UpdateRequestHandler(SocketServer.StreamRequestHandler):
 # we've authenticated, we can break out of the first loop now
 return True
 else:
-raise Exception("The value of the provided token to the 
AccumulatorServer is not correct.")
+raise Exception(
+"The value of the provided token to the AccumulatorServer 
is not correct.")
 
 # first we keep polling till we've received the authentication token
 poll(authenticate_and_accum_updates)
 # now we've authenticated, don't need to check for the token anymore
 poll(accum_updates)
 
+
 class AccumulatorServer(SocketServer.TCPServer):
 
 def __init__(self, server_address, RequestHandlerClass, auth_token):
@@ -278,6 +281,7 @@ class AccumulatorServer(SocketServer.TCPServer):
 SocketServer.TCPServer.shutdown(self)
 self.server_close()
 
+
 def _start_update_server(auth_token):
 """Start a TCP server to receive accumulator updates in a daemon thread, 
and returns it"""
 server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler, 
auth_token)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern

2018-08-02 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master ad2e63662 -> 38e4699c9


[SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a 
barrier stage with unsupported RDD chain pattern

## What changes were proposed in this pull request?

Check on job submit to make sure we don't launch a barrier stage with 
unsupported RDD chain pattern. The following patterns are not supported:
- Ancestor RDDs that have different number of partitions from the resulting RDD 
(eg. union()/coalesce()/first()/PartitionPruningRDD);
- An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2)).

## How was this patch tested?

Add test cases in `BarrierStageOnSubmittedSuite`.

Author: Xingbo Jiang 

Closes #21927 from jiangxb1987/SPARK-24820.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e4699c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e4699c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e4699c

Branch: refs/heads/master
Commit: 38e4699c978e56a0f24b8efb94fd3206cdd8b3fe
Parents: ad2e636
Author: Xingbo Jiang 
Authored: Thu Aug 2 09:36:26 2018 -0700
Committer: Xiangrui Meng 
Committed: Thu Aug 2 09:36:26 2018 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   |  55 ++-
 .../spark/BarrierStageOnSubmittedSuite.scala| 153 +++
 2 files changed, 207 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38e4699c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4858af7..3dd0718 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
+import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -341,6 +341,22 @@ class DAGScheduler(
   }
 
   /**
+   * Check to make sure we don't launch a barrier stage with unsupported RDD 
chain pattern. The
+   * following patterns are not supported:
+   * 1. Ancestor RDDs that have different number of partitions from the 
resulting RDD (eg.
+   * union()/coalesce()/first()/take()/PartitionPruningRDD);
+   * 2. An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2)).
+   */
+  private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], 
numTasksInStage: Int): Unit = {
+val predicate: RDD[_] => Boolean = (r =>
+  r.getNumPartitions == numTasksInStage && 
r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
+if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
+  throw new SparkException(
+
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+}
+  }
+
+  /**
* Creates a ShuffleMapStage that generates the given shuffle dependency's 
partitions. If a
* previously run stage generated the same shuffle data, this function will 
copy the output
* locations that are still available from the previous shuffle to avoid 
unnecessarily
@@ -348,6 +364,7 @@ class DAGScheduler(
*/
   def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: 
Int): ShuffleMapStage = {
 val rdd = shuffleDep.rdd
+checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
 val numTasks = rdd.partitions.length
 val parents = getOrCreateParentStages(rdd, jobId)
 val id = nextStageId.getAndIncrement()
@@ -376,6 +393,7 @@ class DAGScheduler(
   partitions: Array[Int],
   jobId: Int,
   callSite: CallSite): ResultStage = {
+checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
 val parents = getOrCreateParentStages(rdd, jobId)
 val id = nextStageId.getAndIncrement()
 val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, 
callSite)
@@ -451,6 +469,32 @@ class DAGScheduler(
 parents
   }
 
+  /**
+   * Traverses the given RDD and its ancestors within the same stage and 
checks whether all of the
+   * RDDs satisfy a given predicate.
+   */
+  private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => 
Boolean): Boolean = {
+val visited = new HashSet[RDD[_]]
+val waitingForVisit = new 

spark git commit: [SPARK-24598][DOCS] State in the documentation the behavior when arithmetic operations cause overflow

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 15fc23722 -> ad2e63662


[SPARK-24598][DOCS] State in the documentation the behavior when arithmetic 
operations cause overflow

## What changes were proposed in this pull request?

According to the discussion in https://github.com/apache/spark/pull/21599, 
changing the behavior of arithmetic operations so that they can check for 
overflow is not nice in a minor release. What we can do for 2.4 is warn users 
about the current behavior in the documentation, so that they are aware of the 
issue and can take proper actions.

## How was this patch tested?

NA

Author: Marco Gaido 

Closes #21967 from mgaido91/SPARK-24598_doc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad2e6366
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad2e6366
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad2e6366

Branch: refs/heads/master
Commit: ad2e63662885b67b1e94030b13fdae4f7366dc4a
Parents: 15fc237
Author: Marco Gaido 
Authored: Thu Aug 2 09:28:13 2018 -0700
Committer: Xiao Li 
Committed: Thu Aug 2 09:28:13 2018 -0700

--
 docs/sql-programming-guide.md | 7 +++
 1 file changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad2e6366/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 5f1eee8..0900f83 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -3072,3 +3072,10 @@ Specifically:
  - In aggregations, all NaN values are grouped together.
  - NaN is treated as a normal value in join keys.
  - NaN values go last when in ascending order, larger than any other numeric 
value.
+ 
+ ## Arithmetic operations
+ 
+Operations performed on numeric types (with the exception of `decimal`) are 
not checked for overflow.
+This means that in case an operation causes an overflow, the result is the 
same that the same operation
+returns in a Java/Scala program (eg. if the sum of 2 integers is higher than 
the maximum value representable,
+the result is a negative number).


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28514 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_08_02-f04cd67-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-02 Thread pwendell
Author: pwendell
Date: Thu Aug  2 15:16:27 2018
New Revision: 28514

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_02_08_02-f04cd67 docs


[This commit notification would consist of 1469 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] remove dead code in ExpressionEvalHelper

2018-08-02 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d182b3d34 -> f04cd6709


[MINOR] remove dead code in ExpressionEvalHelper

## What changes were proposed in this pull request?

This addresses https://github.com/apache/spark/pull/21236/files#r207078480

both https://github.com/apache/spark/pull/21236 and 
https://github.com/apache/spark/pull/21838 add a InternalRow result check to 
ExpressionEvalHelper and becomes duplicated.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #21958 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f04cd670
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f04cd670
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f04cd670

Branch: refs/heads/master
Commit: f04cd670943d0eb6eb688a0f50d56293cda554ef
Parents: d182b3d
Author: Wenchen Fan 
Authored: Thu Aug 2 09:26:27 2018 -0500
Committer: Sean Owen 
Committed: Thu Aug 2 09:26:27 2018 -0500

--
 .../spark/sql/catalyst/expressions/ExpressionEvalHelper.scala | 3 ---
 1 file changed, 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f04cd670/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index d045267..6684e5c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -105,9 +105,6 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks with PlanTestBa
 if (expected.isNaN) result.isNaN else expected == result
   case (result: Float, expected: Float) =>
 if (expected.isNaN) result.isNaN else expected == result
-  case (result: UnsafeRow, expected: GenericInternalRow) =>
-val structType = exprDataType.asInstanceOf[StructType]
-result.toSeq(structType) == expected.toSeq(structType)
   case (result: Row, expected: InternalRow) => result.toSeq == 
expected.toSeq(result.schema)
   case _ =>
 result == expected


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24742] Fix NullPointerexception in Field Metadata

2018-08-02 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 7be6fc3c7 -> d182b3d34


[SPARK-24742] Fix NullPointerexception in Field Metadata

## What changes were proposed in this pull request?

This pull request provides a fix for SPARK-24742: SQL Field MetaData was 
throwing an Exception in the hashCode method when a "null" Metadata was added 
via "putNull"

## How was this patch tested?

A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala

Author: Kaya Kupferschmidt 

Closes #21722 from kupferk/SPARK-24742.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d182b3d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d182b3d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d182b3d3

Branch: refs/heads/master
Commit: d182b3d34d6afade401b8a455b774059bae9d90f
Parents: 7be6fc3
Author: Kaya Kupferschmidt 
Authored: Thu Aug 2 22:23:24 2018 +0800
Committer: hyukjinkwon 
Committed: Thu Aug 2 22:23:24 2018 +0800

--

--



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24742] Fix NullPointerexception in Field Metadata

2018-08-02 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 46110a589 -> 7be6fc3c7


[SPARK-24742] Fix NullPointerexception in Field Metadata

## What changes were proposed in this pull request?

This pull request provides a fix for SPARK-24742: SQL Field MetaData was 
throwing an Exception in the hashCode method when a "null" Metadata was added 
via "putNull"

## How was this patch tested?

A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala

Author: Kaya Kupferschmidt 

Closes #21722 from kupferk/SPARK-24742.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7be6fc3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7be6fc3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7be6fc3c

Branch: refs/heads/master
Commit: 7be6fc3c77b00f0eefd276676524ec4e36bab868
Parents: 46110a5
Author: Kaya Kupferschmidt 
Authored: Thu Aug 2 09:22:21 2018 -0500
Committer: Sean Owen 
Committed: Thu Aug 2 09:22:21 2018 -0500

--
 .../org/apache/spark/sql/types/Metadata.scala   |  2 +
 .../apache/spark/sql/types/MetadataSuite.scala  | 74 
 2 files changed, 76 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7be6fc3c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 352fb54..7c15dc0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -215,6 +215,8 @@ object Metadata {
 x.##
   case x: Metadata =>
 hash(x.map)
+  case null =>
+0
   case other =>
 throw new RuntimeException(s"Do not support type ${other.getClass}.")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7be6fc3c/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala
new file mode 100644
index 000..210e657
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.SparkFunSuite
+
+class MetadataSuite extends SparkFunSuite {
+  test("String Metadata") {
+val meta = new MetadataBuilder().putString("key", "value").build()
+assert(meta === meta)
+assert(meta.## !== 0)
+assert(meta.getString("key") === "value")
+assert(meta.contains("key"))
+intercept[NoSuchElementException](meta.getString("no_such_key"))
+intercept[ClassCastException](meta.getBoolean("key"))
+  }
+
+  test("Long Metadata") {
+val meta = new MetadataBuilder().putLong("key", 12).build()
+assert(meta === meta)
+assert(meta.## !== 0)
+assert(meta.getLong("key") === 12)
+assert(meta.contains("key"))
+intercept[NoSuchElementException](meta.getLong("no_such_key"))
+intercept[ClassCastException](meta.getBoolean("key"))
+  }
+
+  test("Double Metadata") {
+val meta = new MetadataBuilder().putDouble("key", 12).build()
+assert(meta === meta)
+assert(meta.## !== 0)
+assert(meta.getDouble("key") === 12)
+assert(meta.contains("key"))
+intercept[NoSuchElementException](meta.getDouble("no_such_key"))
+intercept[ClassCastException](meta.getBoolean("key"))
+  }
+
+  test("Boolean Metadata") {
+val meta = new MetadataBuilder().putBoolean("key", true).build()
+assert(meta === meta)
+assert(meta.## !== 0)
+assert(meta.getBoolean("key") === true)
+assert(meta.contains("key"))
+intercept[NoSuchElementException](meta.getBoolean("no_such_key"))
+

spark git commit: [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalPlan Node

2018-08-02 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master a65736996 -> 46110a589


[SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalPlan Node

## What changes were proposed in this pull request?
Remove the AnalysisBarrier LogicalPlan node, which is useless now.

## How was this patch tested?
N/A

Author: Xiao Li 

Closes #21962 from gatorsmile/refactor2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46110a58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46110a58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46110a58

Branch: refs/heads/master
Commit: 46110a589f4e91cd7605c5a2c34c3db6b2635830
Parents: a657369
Author: Xiao Li 
Authored: Thu Aug 2 22:20:41 2018 +0800
Committer: hyukjinkwon 
Committed: Thu Aug 2 22:20:41 2018 +0800

--
 .../plans/logical/basicLogicalOperators.scala   | 20 
 .../org/apache/spark/sql/DataFrameWriter.scala  |  2 +-
 .../spark/sql/execution/CacheManager.scala  |  2 +-
 3 files changed, 2 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46110a58/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 13b5130..68413d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -924,23 +924,3 @@ case class Deduplicate(
 
   override def output: Seq[Attribute] = child.output
 }
-
-/**
- * A logical plan for setting a barrier of analysis.
- *
- * The SQL Analyzer goes through a whole query plan even most part of it is 
analyzed. This
- * increases the time spent on query analysis for long pipelines in ML, 
especially.
- *
- * This logical plan wraps an analyzed logical plan to prevent it from 
analysis again. The barrier
- * is applied to the analyzed logical plan in Dataset. It won't change the 
output of wrapped
- * logical plan and just acts as a wrapper to hide it from analyzer. New 
operations on the dataset
- * will be put on the barrier, so only the new nodes created will be analyzed.
- *
- * This analysis barrier will be removed at the end of analysis stage.
- */
-case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(child)
-  override def output: Seq[Attribute] = child.output
-  override def isStreaming: Boolean = child.isStreaming
-  override def doCanonicalize(): LogicalPlan = child.canonicalized
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/46110a58/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3c9e743..cd7dc2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, 
InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}

http://git-wip-us.apache.org/repos/asf/spark/blob/46110a58/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index ed130dc..c992993 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
 import 

spark git commit: [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support

2018-08-02 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 275415777 -> a65736996


[SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support

## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this 
[document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via 
the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for 
solving the Unit issue, either add () at the end of the closure or use the 
trick described in the doc. Otherwise overloading resolution does not work (we 
are not going to eliminate either of the methods) here. Compiler tries to adapt 
to Unit and makes these two methods candidates for overloading, when there is 
polymorphic overloading there is no ambiguity (that is the workaround 
implemented). This does not look that good but it serves its purpose as we need 
to support two different uses for method: `addTaskCompletionListener`. One that 
passes a TaskCompletionListener and one that passes a closure that is wrapped 
with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users 
> who would want to use 2.11 binaries. In that case they can cast to 
> MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be 
> simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite 
-Dtest=None
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos 

Closes #21930 from skonto/scala2.12-sup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6573699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6573699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6573699

Branch: refs/heads/master
Commit: a65736996b2b506f61cc8d599ec9f4c52a1b5312
Parents: 2754157
Author: Stavros Kontopoulos 
Authored: Thu Aug 2 09:17:09 2018 -0500
Committer: Sean Owen 
Committed: Thu Aug 2 09:17:09 2018 -0500

--
 .../scala/org/apache/spark/TaskContext.scala|   5 +-
 .../apache/spark/api/python/PythonRunner.scala  |   2 +-
 .../spark/broadcast/TorrentBroadcast.scala  |   2 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   2 +-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala|   2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala |   2 +-
 .../spark/rdd/ReliableCheckpointRDD.scala   |   2 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala |   2 +-
 .../storage/ShuffleBlockFetcherIterator.scala   |   2 +-
 .../spark/storage/memory/MemoryStore.scala  |   2 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 278 ---
 .../util/collection/ExternalAppendOnlyMap.scala |   2 +-
 .../apache/spark/util/ClosureCleanerSuite.scala |   3 +
 .../spark/util/ClosureCleanerSuite2.scala   |  53 +++-
 .../apache/spark/sql/avro/AvroFileFormat.scala  |   2 +-
 .../spark/sql/kafka010/KafkaSourceRDD.scala |   2 +-
 .../spark/streaming/kafka010/KafkaRDD.scala |   2 +-
 .../spark/ml/source/libsvm/LibSVMRelation.scala |   2 +-
 .../aggregate/TungstenAggregationIterator.scala |   2 +-
 .../sql/execution/arrow/ArrowConverters.scala   |   4 +-
 .../columnar/InMemoryTableScanExec.scala|   2 +-
 .../execution/datasources/CodecStreams.scala|   2 +-
 .../sql/execution/datasources/FileScanRDD.scala |   2 +-
 .../datasources/csv/CSVDataSource.scala |   2 +-
 .../execution/datasources/jdbc/JDBCRDD.scala|   2 +-
 .../datasources/json/JsonDataSource.scala   |   2 +-
 .../datasources/orc/OrcFileFormat.scala |   4 +-
 .../datasources/parquet/ParquetFileFormat.scala |   4 +-
 .../datasources/text/TextFileFormat.scala   |   2 +-
 .../datasources/v2/DataSourceRDD.scala  |   2 +-
 .../spark/sql/execution/joins/HashJoin.scala|   2 +-
 .../execution/joins/ShuffledHashJoinExec.scala  |   2 +-
 .../python/AggregateInPandasExec.scala  |   2 +-
 .../execution/python/ArrowPythonRunner.scala|   2 +-
 .../sql/execution/python/EvalPythonExec.scala   |   2 +-
 .../execution/python/PythonForeachWriter.scala  |   2 +-
 .../execution/python/WindowInPandasExec.scala   |   2 +-
 .../continuous/ContinuousCoalesceRDD.scala  |   5 +-
 .../continuous/ContinuousQueuedDataReader.scala |   2 +-
 

spark git commit: [SPARK-24795][CORE][FOLLOWUP] Kill all running tasks when a task in a barrier stage fail

2018-08-02 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 57d994994 -> 275415777


[SPARK-24795][CORE][FOLLOWUP] Kill all running tasks when a task in a barrier 
stage fail

## What changes were proposed in this pull request?

Kill all running tasks when a task in a barrier stage fail in the middle. 
`TaskScheduler`.`cancelTasks()` will also fail the job, so we implemented a new 
method `killAllTaskAttempts()` to just kill all running tasks of a stage 
without cancel the stage/job.

## How was this patch tested?

Add new test cases in `TaskSchedulerImplSuite`.

Author: Xingbo Jiang 

Closes #21943 from jiangxb1987/killAllTasks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27541577
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27541577
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27541577

Branch: refs/heads/master
Commit: 275415777b84b82aa5409e6577e1efaff1d989e7
Parents: 57d9949
Author: Xingbo Jiang 
Authored: Thu Aug 2 20:54:36 2018 +0800
Committer: Wenchen Fan 
Committed: Thu Aug 2 20:54:36 2018 +0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 14 +++--
 .../apache/spark/scheduler/TaskScheduler.scala  |  8 ++-
 .../spark/scheduler/TaskSchedulerImpl.scala | 34 +++
 .../spark/scheduler/DAGSchedulerSuite.scala |  6 ++
 .../scheduler/ExternalClusterManagerSuite.scala |  2 +
 .../scheduler/TaskSchedulerImplSuite.scala  | 62 
 6 files changed, 109 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27541577/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 003d64f..4858af7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1433,17 +1433,18 @@ class DAGScheduler(
 val failedStage = stageIdToStage(task.stageId)
 logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to 
a barrier task " +
   "failed.")
-val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully.\n" +
   failure.toErrorString
 try {
-  // cancelTasks will fail if a SchedulerBackend does not implement 
killTask
-  taskScheduler.cancelTasks(stageId, interruptThread = false)
+  // killAllTaskAttempts will fail if a SchedulerBackend does not 
implement killTask.
+  val reason = s"Task $task from barrier stage $failedStage 
(${failedStage.name}) failed."
+  taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, 
reason)
 } catch {
   case e: UnsupportedOperationException =>
 // Cannot continue with barrier stage if failed to cancel zombie 
barrier tasks.
 // TODO SPARK-24877 leave the zombie tasks and ignore their 
completion events.
-logWarning(s"Could not cancel tasks for stage $stageId", e)
-abortStage(failedStage, "Could not cancel zombie barrier tasks for 
stage " +
+logWarning(s"Could not kill all tasks for stage $stageId", e)
+abortStage(failedStage, "Could not kill zombie barrier tasks for 
stage " +
   s"$failedStage (${failedStage.name})", Some(e))
 }
 markStageAsFinished(failedStage, Some(message))
@@ -1457,7 +1458,8 @@ class DAGScheduler(
 
 if (shouldAbortStage) {
   val abortMessage = if (disallowStageRetryForTest) {
-"Barrier stage will not retry stage due to testing config"
+"Barrier stage will not retry stage due to testing config. Most 
recent failure " +
+  s"reason: $message"
   } else {
 s"""$failedStage (${failedStage.name})
|has failed the maximum allowable number of

http://git-wip-us.apache.org/repos/asf/spark/blob/27541577/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 90644fe..95f7ae4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -51,16 +51,22 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Cancel a stage.
+  // Kill 

svn commit: r28508 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_00_02-57d9949-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-02 Thread pwendell
Author: pwendell
Date: Thu Aug  2 07:16:48 2018
New Revision: 28508

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_02_00_02-57d9949 docs


[This commit notification would consist of 1469 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24557][ML] ClusteringEvaluator support array input

2018-08-02 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 166f34618 -> 57d994994


[SPARK-24557][ML] ClusteringEvaluator support array input

## What changes were proposed in this pull request?
ClusteringEvaluator support array input

## How was this patch tested?
added tests

Author: zhengruifeng 

Closes #21563 from zhengruifeng/clu_eval_support_array.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57d99499
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57d99499
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57d99499

Branch: refs/heads/master
Commit: 57d994994d27154f57f2724924c42beb2ab2e0e7
Parents: 166f346
Author: zhengruifeng 
Authored: Wed Aug 1 23:46:01 2018 -0700
Committer: Xiangrui Meng 
Committed: Wed Aug 1 23:46:01 2018 -0700

--
 .../spark/ml/evaluation/ClusteringEvaluator.scala| 15 +--
 .../ml/evaluation/ClusteringEvaluatorSuite.scala | 15 ++-
 2 files changed, 23 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/57d99499/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
index 4353c46..a6d6b4e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
@@ -21,11 +21,10 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.ml.attribute.AttributeGroup
-import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, 
Vectors, VectorUDT}
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, 
Vectors}
 import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
-import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable,
-  SchemaUtils}
+import org.apache.spark.ml.util._
 import org.apache.spark.sql.{Column, DataFrame, Dataset}
 import org.apache.spark.sql.functions.{avg, col, udf}
 import org.apache.spark.sql.types.DoubleType
@@ -107,15 +106,19 @@ class ClusteringEvaluator @Since("2.3.0") 
(@Since("2.3.0") override val uid: Str
 
   @Since("2.3.0")
   override def evaluate(dataset: Dataset[_]): Double = {
-SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
+SchemaUtils.validateVectorCompatibleColumn(dataset.schema, $(featuresCol))
 SchemaUtils.checkNumericType(dataset.schema, $(predictionCol))
 
+val vectorCol = DatasetUtils.columnToVector(dataset, $(featuresCol))
+val df = dataset.select(col($(predictionCol)),
+  vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata))
+
 ($(metricName), $(distanceMeasure)) match {
   case ("silhouette", "squaredEuclidean") =>
 SquaredEuclideanSilhouette.computeSilhouetteScore(
-  dataset, $(predictionCol), $(featuresCol))
+  df, $(predictionCol), $(featuresCol))
   case ("silhouette", "cosine") =>
-CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), 
$(featuresCol))
+CosineSilhouette.computeSilhouetteScore(df, $(predictionCol), 
$(featuresCol))
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/57d99499/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
index 2c175ff..e2d7756 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.ml.attribute.AttributeGroup
 import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.Dataset
@@ -33,10 +33,17 @@ class ClusteringEvaluatorSuite
   import testImplicits._
 
   @transient var irisDataset: Dataset[_] = _
+  @transient var newIrisDataset: Dataset[_] = _
+  

spark git commit: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

2018-08-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master c9914cf04 -> 166f34618


[SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

## What changes were proposed in this pull request?
This PR is to refactor the code in AVERAGE by dsl.

## How was this patch tested?
N/A

Author: Xiao Li 

Closes #21951 from gatorsmile/refactor1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/166f3461
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/166f3461
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/166f3461

Branch: refs/heads/master
Commit: 166f346185cc0b27a7e2b2a3b42df277e5901f2f
Parents: c9914cf
Author: Xiao Li 
Authored: Wed Aug 1 23:00:17 2018 -0700
Committer: Xiao Li 
Committed: Wed Aug 1 23:00:17 2018 -0700

--
 .../scala/org/apache/spark/sql/catalyst/dsl/package.scala |  1 +
 .../sql/catalyst/expressions/aggregate/Average.scala  | 10 --
 2 files changed, 5 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/166f3461/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 89e8c99..9870854 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -166,6 +166,7 @@ package object dsl {
 def maxDistinct(e: Expression): Expression = 
Max(e).toAggregateExpression(isDistinct = true)
 def upper(e: Expression): Expression = Upper(e)
 def lower(e: Expression): Expression = Lower(e)
+def coalesce(args: Expression*): Expression = Coalesce(args)
 def sqrt(e: Expression): Expression = Sqrt(e)
 def abs(e: Expression): Expression = Abs(e)
 def star(names: String*): Expression = names match {

http://git-wip-us.apache.org/repos/asf/spark/blob/166f3461/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
index 9ccf5aa..f1fad77 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
@@ -46,7 +46,7 @@ abstract class AverageLike(child: Expression) extends 
DeclarativeAggregate {
   override lazy val aggBufferAttributes = sum :: count :: Nil
 
   override lazy val initialValues = Seq(
-/* sum = */ Cast(Literal(0), sumDataType),
+/* sum = */ Literal(0).cast(sumDataType),
 /* count = */ Literal(0L)
   )
 
@@ -58,18 +58,16 @@ abstract class AverageLike(child: Expression) extends 
DeclarativeAggregate {
   // If all input are nulls, count will be 0 and we will get null after the 
division.
   override lazy val evaluateExpression = child.dataType match {
 case _: DecimalType =>
-  Cast(
-DecimalPrecision.decimalAndDecimal(sum / Cast(count, 
DecimalType.LongDecimal)),
-resultType)
+  DecimalPrecision.decimalAndDecimal(sum / 
count.cast(DecimalType.LongDecimal)).cast(resultType)
 case _ =>
-  Cast(sum, resultType) / Cast(count, resultType)
+  sum.cast(resultType) / count.cast(resultType)
   }
 
   protected def updateExpressionsDef: Seq[Expression] = Seq(
 /* sum = */
 Add(
   sum,
-  Coalesce(Cast(child, sumDataType) :: Cast(Literal(0), sumDataType) :: 
Nil)),
+  coalesce(child.cast(sumDataType), Literal(0).cast(sumDataType))),
 /* count = */ If(IsNull(child), count, count + 1L)
   )
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org