spark git commit: [SPARK-24997][SQL] Enable support of MINUS ALL
Repository: spark Updated Branches: refs/heads/master b0d6967d4 -> 19a453191 [SPARK-24997][SQL] Enable support of MINUS ALL ## What changes were proposed in this pull request? Enable support for MINUS ALL which was gated at AstBuilder. ## How was this patch tested? Added tests in SQLQueryTestSuite and modify PlanParserSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Dilip Biswal Closes #21963 from dilipbiswal/minus-all. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19a45319 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19a45319 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19a45319 Branch: refs/heads/master Commit: 19a45319130d618a173f5f3b4dde59356b39089b Parents: b0d6967 Author: Dilip Biswal Authored: Thu Aug 2 22:45:10 2018 -0700 Committer: Xiao Li Committed: Thu Aug 2 22:45:10 2018 -0700 -- .../spark/sql/catalyst/parser/AstBuilder.scala | 11 +- .../sql/catalyst/parser/PlanParserSuite.scala | 4 +- .../resources/sql-tests/inputs/except-all.sql | 22 ++- .../sql-tests/results/except-all.sql.out| 147 +++ 4 files changed, 113 insertions(+), 71 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/19a45319/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 0ceeb53..9906a30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -517,11 +517,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Connect two queries by a Set operator. * * Supported Set operators are: - * - UNION [DISTINCT] - * - UNION ALL - * - EXCEPT [DISTINCT] - * - MINUS [DISTINCT] - * - INTERSECT [DISTINCT] + * - UNION [ DISTINCT | ALL ] + * - EXCEPT [ DISTINCT | ALL ] + * - MINUS [ DISTINCT | ALL ] + * - INTERSECT [DISTINCT | ALL] */ override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = withOrigin(ctx) { val left = plan(ctx.left) @@ -541,7 +540,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.EXCEPT => Except(left, right) case SqlBaseParser.SETMINUS if all => -throw new ParseException("MINUS ALL is not supported.", ctx) +Except(left, right, isAll = true) case SqlBaseParser.SETMINUS => Except(left, right) } http://git-wip-us.apache.org/repos/asf/spark/blob/19a45319/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 38efd89..9247004 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -67,11 +67,13 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select * from a union all select * from b", a.union(b)) assertEqual("select * from a except select * from b", a.except(b)) assertEqual("select * from a except distinct select * from b", a.except(b)) +assertEqual("select * from a except all select * from b", a.except(b, isAll = true)) assertEqual("select * from a minus select * from b", a.except(b)) -intercept("select * from a minus all select * from b", "MINUS ALL is not supported.") +assertEqual("select * from a minus all select * from b", a.except(b, isAll = true)) assertEqual("select * from a minus distinct select * from b", a.except(b)) assertEqual("select * from a intersect select * from b", a.intersect(b)) assertEqual("select * from a intersect distinct select * from b", a.intersect(b)) +assertEqual("select * from a intersect all select * from b", a.intersect(b, isAll = true)) } test("common table expressions") { http://git-wip-us.apache.org/repos/asf/spark/blob/19a45319/sql/core/src/test/resources/sql-tests/inputs/except-all.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql index 08b9a43..e28f072
spark git commit: [SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs should not fail
Repository: spark Updated Branches: refs/heads/master f45d60a5a -> b0d6967d4 [SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs should not fail ## What changes were proposed in this pull request? In the current master, `toString` throws an exception when `RelationalGroupedDataset` has unresolved expressions; ``` scala> spark.range(0, 10).groupBy("id") res4: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [id: bigint], value: [id: bigint], type: GroupBy] scala> spark.range(0, 10).groupBy('id) org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'id at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:474) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:473) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.RelationalGroupedDataset.toString(RelationalGroupedDataset.scala:473) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345) ``` This pr fixed code to handle the unresolved case in `RelationalGroupedDataset.toString`. Closes #21752 ## How was this patch tested? Added tests in `DataFrameAggregateSuite`. Author: Chris Horn Author: Takeshi Yamamuro Closes #21964 from maropu/SPARK-24788. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0d6967d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0d6967d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0d6967d Branch: refs/heads/master Commit: b0d6967d45f3260ed4ee9b2a49f801d799e81283 Parents: f45d60a Author: Chris Horn Authored: Thu Aug 2 22:40:58 2018 -0700 Committer: Xiao Li Committed: Thu Aug 2 22:40:58 2018 -0700 -- .../org/apache/spark/sql/RelationalGroupedDataset.scala | 7 +-- .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 10 ++ 2 files changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0d6967d/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 8412219..4e73b36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -469,8 +469,11 @@ class RelationalGroupedDataset protected[sql]( override def toString: String = { val builder = new StringBuilder builder.append("RelationalGroupedDataset: [grouping expressions: [") -val kFields = groupingExprs.map(_.asInstanceOf[NamedExpression]).map { - case f => s"${f.name}: ${f.dataType.simpleString(2)}" +val kFields = groupingExprs.collect { + case expr: NamedExpression if expr.resolved => +s"${expr.name}: ${expr.dataType.simpleString(2)}" + case expr: NamedExpression => expr.name + case o => o.toString } builder.append(kFields.take(2).mkString(", ")) if (kFields.length > 2) { http://git-wip-us.apache.org/repos/asf/spark/blob/b0d6967d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index f495a94..d0106c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -717,4 +717,14 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { Row(1, 2, 1) :: Row(2, 2, 2) :: Row(3, 2, 3) :: Nil) } + test("SPARK-24788: RelationalGroupedDataset.toString with unresolved exprs should not fail") { +// Checks if these raise no
spark git commit: [SPARK-25002][SQL] Avro: revise the output record namespace
Repository: spark Updated Branches: refs/heads/master 73dd6cf9b -> f45d60a5a [SPARK-25002][SQL] Avro: revise the output record namespace ## What changes were proposed in this pull request? Currently the output namespace is starting with ".", e.g. `.topLevelRecord` Although it is valid according to Avro spec, we should remove the starting dot in case of failures when the output Avro file is read by other lib: https://github.com/linkedin/goavro/pull/96 ## How was this patch tested? Unit test Author: Gengliang Wang Closes #21974 from gengliangwang/avro_namespace. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f45d60a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f45d60a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f45d60a5 Branch: refs/heads/master Commit: f45d60a5a1f1e97ecde36eda8202034d78f93d53 Parents: 73dd6cf Author: Gengliang Wang Authored: Fri Aug 3 13:28:44 2018 +0800 Committer: Wenchen Fan Committed: Fri Aug 3 13:28:44 2018 +0800 -- .../apache/spark/sql/avro/SchemaConverters.scala | 6 +- .../org/apache/spark/sql/avro/AvroSuite.scala | 17 + 2 files changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f45d60a5/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala -- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 1e91207..6929539 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -142,7 +142,11 @@ object SchemaConverters { builder.map() .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace, outputTimestampType)) case st: StructType => -val nameSpace = s"$prevNameSpace.$recordName" +val nameSpace = prevNameSpace match { + case "" => recordName + case _ => s"$prevNameSpace.$recordName" +} + val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() st.foreach { f => val fieldAvroType = http://git-wip-us.apache.org/repos/asf/spark/blob/f45d60a5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala -- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 085c8c8..b4dcf6c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -884,6 +884,23 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { case class NestedTop(id: Int, data: NestedMiddle) + test("Validate namespace in avro file that has nested records with the same name") { +withTempPath { dir => + val writeDf = spark.createDataFrame(List(NestedTop(1, NestedMiddle(2, NestedBottom(3, "1") + writeDf.write.format("avro").save(dir.toString) + val file = new File(dir.toString) +.listFiles() +.filter(_.isFile) +.filter(_.getName.endsWith("avro")) +.head + val reader = new DataFileReader(file, new GenericDatumReader[Any]()) + val schema = reader.getSchema.toString() + assert(schema.contains("\"namespace\":\"topLevelRecord\"")) + assert(schema.contains("\"namespace\":\"topLevelRecord.data\"")) + assert(schema.contains("\"namespace\":\"topLevelRecord.data.data\"")) +} + } + test("saving avro that has nested records with the same name") { withTempPath { tempDir => // Save avro file on output folder path - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28523 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_02_22_01-8080c93-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 05:15:41 2018 New Revision: 28523 Log: Apache Spark 2.3.3-SNAPSHOT-2018_08_02_22_01-8080c93 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24966][SQL] Implement precedence rules for set operations.
Repository: spark Updated Branches: refs/heads/master b3f2911ee -> 73dd6cf9b [SPARK-24966][SQL] Implement precedence rules for set operations. ## What changes were proposed in this pull request? Currently the set operations INTERSECT, UNION and EXCEPT are assigned the same precedence. This PR fixes the problem by giving INTERSECT higher precedence than UNION and EXCEPT. UNION and EXCEPT operators are evaluated in the order in which they appear in the query from left to right. This results in change in behavior because of the change in order of evaluations of set operators in a query. The old behavior is still preserved under a newly added config parameter. Query `:` ``` SELECT * FROM t1 UNION SELECT * FROM t2 EXCEPT SELECT * FROM t3 INTERSECT SELECT * FROM t4 ``` Parsed plan before the change `:` ``` == Parsed Logical Plan == 'Intersect false :- 'Except false : :- 'Distinct : : +- 'Union : : :- 'Project [*] : : : +- 'UnresolvedRelation `t1` : : +- 'Project [*] : :+- 'UnresolvedRelation `t2` : +- 'Project [*] : +- 'UnresolvedRelation `t3` +- 'Project [*] +- 'UnresolvedRelation `t4` ``` Parsed plan after the change `:` ``` == Parsed Logical Plan == 'Except false :- 'Distinct : +- 'Union : :- 'Project [*] : : +- 'UnresolvedRelation `t1` : +- 'Project [*] :+- 'UnresolvedRelation `t2` +- 'Intersect false :- 'Project [*] : +- 'UnresolvedRelation `t3` +- 'Project [*] +- 'UnresolvedRelation `t4` ``` ## How was this patch tested? Added tests in PlanParserSuite, SQLQueryTestSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Dilip Biswal Closes #21941 from dilipbiswal/SPARK-24966. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73dd6cf9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73dd6cf9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73dd6cf9 Branch: refs/heads/master Commit: 73dd6cf9b558f9d752e1f3c13584344257ad7863 Parents: b3f2911 Author: Dilip Biswal Authored: Thu Aug 2 22:04:17 2018 -0700 Committer: Xiao Li Committed: Thu Aug 2 22:04:17 2018 -0700 -- docs/sql-programming-guide.md | 1 + .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 15 ++- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../spark/sql/catalyst/parser/ParseDriver.scala | 2 + .../plans/logical/basicLogicalOperators.scala | 6 +- .../org/apache/spark/sql/internal/SQLConf.scala | 12 +++ .../sql/catalyst/parser/PlanParserSuite.scala | 45 .../spark/sql/execution/SparkStrategies.scala | 4 +- .../sql-tests/inputs/intersect-all.sql | 51 +++-- .../sql-tests/results/intersect-all.sql.out | 104 +++ 10 files changed, 211 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73dd6cf9/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 0900f83..a1e019c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1876,6 +1876,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.3 to 2.4 + - Since Spark 2.4, Spark will evaluate the set operations referenced in a query by following a precedence rule as per the SQL standard. If the order is not specified by parentheses, set operations are performed from left to right with the exception that all INTERSECT operations are performed before any UNION, EXCEPT or MINUS operations. The old behaviour of giving equal precedence to all the set operations are preserved under a newly added configuaration `spark.sql.legacy.setopsPrecedence.enabled` with a default value of `false`. When this property is set to `true`, spark will evaluate the set operators from left to right as they appear in the query given no explicit ordering is enforced by usage of parenthesis. - Since Spark 2.4, Spark will display table description column Last Access value as UNKNOWN when the value was Jan 01 1970. - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively. - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by
spark git commit: [PYSPARK] Updates to Accumulators
Repository: spark Updated Branches: refs/heads/branch-2.3 5b187a85a -> 8080c937d [PYSPARK] Updates to Accumulators (cherry picked from commit 15fc2372269159ea2556b028d4eb8860c4108650) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8080c937 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8080c937 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8080c937 Branch: refs/heads/branch-2.3 Commit: 8080c937d3752aee2fd36f0045a057f7130f6fe4 Parents: 5b187a8 Author: LucaCanali Authored: Wed Jul 18 23:19:02 2018 +0200 Committer: Imran Rashid Committed: Thu Aug 2 21:05:03 2018 -0500 -- .../org/apache/spark/api/python/PythonRDD.scala | 12 +++-- python/pyspark/accumulators.py | 53 +++- python/pyspark/context.py | 5 +- 3 files changed, 53 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8080c937/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a1ee2f7..8bc0ff7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -586,8 +586,9 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By */ private[spark] class PythonAccumulatorV2( @transient private val serverHost: String, -private val serverPort: Int) - extends CollectionAccumulator[Array[Byte]] { +private val serverPort: Int, +private val secretToken: String) + extends CollectionAccumulator[Array[Byte]] with Logging{ Utils.checkHost(serverHost) @@ -602,12 +603,17 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) + logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + // send the secret just for the initial authentication when opening a new connection + socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } socket } // Need to override so the types match with PythonFunction - override def copyAndReset(): PythonAccumulatorV2 = new PythonAccumulatorV2(serverHost, serverPort) + override def copyAndReset(): PythonAccumulatorV2 = { +new PythonAccumulatorV2(serverHost, serverPort, secretToken) + } override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): Unit = synchronized { val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2] http://git-wip-us.apache.org/repos/asf/spark/blob/8080c937/python/pyspark/accumulators.py -- diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 6ef8cf5..bc0be07 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -228,20 +228,49 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry -while not self.server.server_shutdown: -# Poll every 1 second for new data -- don't block in case of shutdown. -r, _, _ = select.select([self.rfile], [], [], 1) -if self.rfile in r: -num_updates = read_int(self.rfile) -for _ in range(num_updates): -(aid, update) = pickleSer._read_with_length(self.rfile) -_accumulatorRegistry[aid] += update -# Write a byte in acknowledgement -self.wfile.write(struct.pack("!b", 1)) +auth_token = self.server.auth_token + +def poll(func): +while not self.server.server_shutdown: +# Poll every 1 second for new data -- don't block in case of shutdown. +r, _, _ = select.select([self.rfile], [], [], 1) +if self.rfile in r: +if func(): +break + +def accum_updates(): +num_updates = read_int(self.rfile) +for _ in range(num_updates): +(aid, update) = pickleSer._read_with_length(self.rfile) +_accumulatorRegistry[aid] += update +# Write a byte in acknowledgement +self.wfile.write(struct.pack("!b", 1)) +return False + +def authenticate_and_accum_updates(): +received_token = self.rfile.read(len(auth_token)) +if isinstance(received_token, bytes): +
svn commit: r28522 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_20_01-b3f2911-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 03:15:49 2018 New Revision: 28522 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_02_20_01-b3f2911 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24945][SQL] Switching to uniVocity 2.7.3
Repository: spark Updated Branches: refs/heads/master 7cf16a7fa -> b3f2911ee [SPARK-24945][SQL] Switching to uniVocity 2.7.3 ## What changes were proposed in this pull request? In the PR, I propose to upgrade uniVocity parser from **2.6.3** to **2.7.3**. The recent version includes a fix for the SPARK-24645 issue and has better performance. Before changes: ``` Parsing quoted values: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative One quoted string 6 / 34122 0.0 666727.0 1.0X Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Select 1000 columns 90287 / 91713 0.0 90286.9 1.0X Select 100 columns 31826 / 36589 0.0 31826.4 2.8X Select one column 25738 / 25872 0.0 25737.9 3.5X count() 6931 / 7269 0.1 6931.5 13.0X ``` after: ``` Parsing quoted values: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative One quoted string 33411 / 33510 0.0 668211.4 1.0X Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Select 1000 columns 88028 / 89311 0.0 88028.1 1.0X Select 100 columns 29010 / 32755 0.0 29010.1 3.0X Select one column 22936 / 22953 0.0 22936.5 3.8X count() 6657 / 6740 0.2 6656.6 13.5X ``` Closes #21892 ## How was this patch tested? It was tested by `CSVSuite` and `CSVBenchmarks` Author: Maxim Gekk Closes #21969 from MaxGekk/univocity-2_7_3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3f2911e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3f2911e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3f2911e Branch: refs/heads/master Commit: b3f2911eebeb418631ce296f68a7cc68083659cd Parents: 7cf16a7 Author: Maxim Gekk Authored: Fri Aug 3 08:33:28 2018 +0800 Committer: hyukjinkwon Committed: Fri Aug 3 08:33:28 2018 +0800 -- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- dev/deps/spark-deps-hadoop-3.1 | 2 +- sql/core/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 4ef61b2..54cdcfc 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -191,7 +191,7 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-2.6.3.jar +univocity-parsers-2.7.3.jar validation-api-1.1.0.Final.jar xbean-asm6-shaded-4.8.jar xercesImpl-2.9.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index a74ce1f..fda13db 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -192,7 +192,7 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-2.6.3.jar +univocity-parsers-2.7.3.jar validation-api-1.1.0.Final.jar xbean-asm6-shaded-4.8.jar xercesImpl-2.9.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/dev/deps/spark-deps-hadoop-3.1 -- diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index e0fcca0..90602fc 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -212,7 +212,7 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar token-provider-1.0.1.jar -univocity-parsers-2.6.3.jar +univocity-parsers-2.7.3.jar validation-api-1.1.0.Final.jar woodstox-core-5.0.3.jar xbean-asm6-shaded-4.8.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b3f2911e/sql/core/pom.xml
spark git commit: [SPARK-24773] Avro: support logical timestamp type with different precisions
Repository: spark Updated Branches: refs/heads/master 29077a1d1 -> 7cf16a7fa [SPARK-24773] Avro: support logical timestamp type with different precisions ## What changes were proposed in this pull request? Support reading/writing Avro logical timestamp type with different precisions https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29 To specify the output timestamp type, use Dataframe option `outputTimestampType` or SQL config `spark.sql.avro.outputTimestampType`. The supported values are * `TIMESTAMP_MICROS` * `TIMESTAMP_MILLIS` The default output type is `TIMESTAMP_MICROS` ## How was this patch tested? Unit test Author: Gengliang Wang Closes #21935 from gengliangwang/avro_timestamp. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7cf16a7f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7cf16a7f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7cf16a7f Branch: refs/heads/master Commit: 7cf16a7fa4eb4145c0c5d1dd2555f78a2fdd8d8b Parents: 29077a1 Author: Gengliang Wang Authored: Fri Aug 3 08:32:08 2018 +0800 Committer: hyukjinkwon Committed: Fri Aug 3 08:32:08 2018 +0800 -- .../spark/sql/avro/AvroDeserializer.scala | 15 ++- .../apache/spark/sql/avro/AvroFileFormat.scala | 4 +- .../org/apache/spark/sql/avro/AvroOptions.scala | 11 ++ .../apache/spark/sql/avro/AvroSerializer.scala | 12 ++- .../spark/sql/avro/SchemaConverters.scala | 33 -- external/avro/src/test/resources/timestamp.avro | Bin 0 -> 375 bytes .../org/apache/spark/sql/avro/AvroSuite.scala | 107 +-- .../org/apache/spark/sql/internal/SQLConf.scala | 18 8 files changed, 178 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7cf16a7f/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala -- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index b31149a..394a62b 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} import org.apache.avro.Schema.Type._ import org.apache.avro.generic._ import org.apache.avro.util.Utf8 @@ -86,8 +87,18 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { case (LONG, LongType) => (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long]) - case (LONG, TimestampType) => (updater, ordinal, value) => -updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + case (LONG, TimestampType) => avroType.getLogicalType match { +case _: TimestampMillis => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) +case _: TimestampMicros => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) +case null => (updater, ordinal, value) => + // For backward compatibility, if the Avro type is Long and it is not logical type, + // the value is processed as timestamp type with millisecond precision. + updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) +case other => throw new IncompatibleSchemaException( + s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") + } case (LONG, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeUtils.MILLIS_PER_DAY).toInt) http://git-wip-us.apache.org/repos/asf/spark/blob/7cf16a7f/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala -- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 6776516..6ffcf37 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -113,8 +113,8 @@ private[avro] class AvroFileFormat extends FileFormat options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf()) -val outputAvroSchema =
spark git commit: [SPARK-24795][CORE][FOLLOWUP] Combine BarrierTaskContext with BarrierTaskContextImpl
Repository: spark Updated Branches: refs/heads/master bbdcc3bf6 -> 29077a1d1 [SPARK-24795][CORE][FOLLOWUP] Combine BarrierTaskContext with BarrierTaskContextImpl ## What changes were proposed in this pull request? According to https://github.com/apache/spark/pull/21758#discussion_r206746905 , current declaration of `BarrierTaskContext` didn't extend methods from `TaskContext`. Since `TaskContext` is an abstract class and we don't want to change it to a trait, we have to define class `BarrierTaskContext` directly. ## How was this patch tested? Existing tests. Author: Xingbo Jiang Closes #21972 from jiangxb1987/BarrierTaskContext. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29077a1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29077a1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29077a1d Branch: refs/heads/master Commit: 29077a1d15e49dfafe7f2eab963830ba9cc6b29a Parents: bbdcc3b Author: Xingbo Jiang Authored: Thu Aug 2 17:19:42 2018 -0700 Committer: Xiangrui Meng Committed: Thu Aug 2 17:19:42 2018 -0700 -- .../org/apache/spark/BarrierTaskContext.scala | 60 +++- .../apache/spark/BarrierTaskContextImpl.scala | 49 .../scala/org/apache/spark/rdd/RDDBarrier.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- 4 files changed, 59 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29077a1d/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 4c35862..ba30368 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -17,20 +17,71 @@ package org.apache.spark +import java.util.Properties + import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem /** A [[TaskContext]] with extra info and tooling for a barrier stage. */ -trait BarrierTaskContext extends TaskContext { +class BarrierTaskContext( +override val stageId: Int, +override val stageAttemptNumber: Int, +override val partitionId: Int, +override val taskAttemptId: Long, +override val attemptNumber: Int, +override val taskMemoryManager: TaskMemoryManager, +localProperties: Properties, +@transient private val metricsSystem: MetricsSystem, +// The default value is only used in tests. +override val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, + taskMemoryManager, localProperties, metricsSystem, taskMetrics) { /** * :: Experimental :: * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same * stage have reached this routine. + * + * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all + * possible code branches. Otherwise, you may get the job hanging or a SparkException after + * timeout. Some examples of misuses listed below: + * 1. Only call barrier() function on a subset of all the tasks in the same barrier stage, it + * shall lead to timeout of the function call. + * {{{ + * rdd.barrier().mapPartitions { (iter, context) => + * if (context.partitionId() == 0) { + * // Do nothing. + * } else { + * context.barrier() + * } + * iter + * } + * }}} + * + * 2. Include barrier() function in a try-catch code block, this may lead to timeout of the + * second function call. + * {{{ + * rdd.barrier().mapPartitions { (iter, context) => + * try { + * // Do something that might throw an Exception. + * doSomething() + * context.barrier() + * } catch { + * case e: Exception => logWarning("...", e) + * } + * context.barrier() + * iter + * } + * }}} */ @Experimental @Since("2.4.0") - def barrier(): Unit + def barrier(): Unit = { +// TODO SPARK-24817 implement global barrier. + } /** * :: Experimental :: @@ -38,5 +89,8 @@ trait BarrierTaskContext extends TaskContext { */ @Experimental @Since("2.4.0") - def getTaskInfos(): Array[BarrierTaskInfo] + def getTaskInfos(): Array[BarrierTaskInfo] = { +val addressesStr =
spark git commit: [SPARK-22219][SQL] Refactor code to get a value for "spark.sql.codegen.comments"
Repository: spark Updated Branches: refs/heads/master d0bc3ed67 -> bbdcc3bf6 [SPARK-22219][SQL] Refactor code to get a value for "spark.sql.codegen.comments" ## What changes were proposed in this pull request? This PR refactors code to get a value for "spark.sql.codegen.comments" by avoiding `SparkEnv.get.conf`. This PR uses `SQLConf.get.codegenComments` since `SQLConf.get` always returns an instance of `SQLConf`. ## How was this patch tested? Added test case to `DebuggingSuite` Author: Kazuaki Ishizaki Closes #19449 from kiszk/SPARK-22219. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbdcc3bf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbdcc3bf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbdcc3bf Branch: refs/heads/master Commit: bbdcc3bf61da39704650d4570c6307b5a46f7100 Parents: d0bc3ed Author: Kazuaki Ishizaki Authored: Thu Aug 2 18:19:04 2018 -0500 Committer: Sean Owen Committed: Thu Aug 2 18:19:04 2018 -0500 -- .../expressions/codegen/CodeGenerator.scala | 7 +-- .../org/apache/spark/sql/internal/SQLConf.scala | 2 ++ .../apache/spark/sql/internal/StaticSQLConf.scala | 8 .../sql/internal/ExecutorSideSQLConfSuite.scala | 16 4 files changed, 27 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bbdcc3bf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 05500f5..498dd26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1173,12 +1173,7 @@ class CodegenContext { text: => String, placeholderId: String = "", force: Boolean = false): Block = { -// By default, disable comments in generated code because computing the comments themselves can -// be extremely expensive in certain cases, such as deeply-nested expressions which operate over -// inputs with wide schemas. For more details on the performance issues that motivated this -// flat, see SPARK-15680. -if (force || - SparkEnv.get != null && SparkEnv.get.conf.getBoolean("spark.sql.codegen.comments", false)) { +if (force || SQLConf.get.codegenComments) { val name = if (placeholderId != "") { assert(!placeHolderToComments.contains(placeholderId)) placeholderId http://git-wip-us.apache.org/repos/asf/spark/blob/bbdcc3bf/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index edc1a48..8f303f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1599,6 +1599,8 @@ class SQLConf extends Serializable with Logging { def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) + def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS) + def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) http://git-wip-us.apache.org/repos/asf/spark/blob/bbdcc3bf/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 384b191..d9c354b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -74,6 +74,14 @@ object StaticSQLConf { .checkValue(maxEntries => maxEntries >= 0, "The maximum must not be negative") .createWithDefault(100) + val CODEGEN_COMMENTS = buildStaticConf("spark.sql.codegen.comments") +.internal() +.doc("When true, put comment in the generated code. Since computing huge comments " + + "can be extremely expensive in certain cases, such as deeply-nested expressions which " + + "operate over inputs with wide schemas, default is false.") +.booleanConf +
svn commit: r28520 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_16_02-d0bc3ed-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Aug 2 23:16:01 2018 New Revision: 28520 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_02_16_02-d0bc3ed docs [This commit notification would consist of 1469 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query
Repository: spark Updated Branches: refs/heads/master efef55388 -> d0bc3ed67 [SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query ## What changes were proposed in this pull request? `Uuid`'s results depend on random seed given during analysis. Thus under streaming query, we will have the same uuids in each execution. This seems to be incorrect for streaming query execution. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh Closes #21854 from viirya/uuid_in_streaming. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0bc3ed6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0bc3ed6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0bc3ed6 Branch: refs/heads/master Commit: d0bc3ed6797e0c06f688b7b2ef6c26282a25b175 Parents: efef553 Author: Liang-Chi Hsieh Authored: Thu Aug 2 15:35:46 2018 -0700 Committer: Shixiong Zhu Committed: Thu Aug 2 15:35:46 2018 -0700 -- .../streaming/IncrementalExecution.scala| 8 ++- .../sql/streaming/StreamingQuerySuite.scala | 22 +++- 2 files changed, 28 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 6ae7f28..e9ffe12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import scala.util.Random + import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp +import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule @@ -73,10 +75,14 @@ class IncrementalExecution( * with the desired literal */ override lazy val optimizedPlan: LogicalPlan = { +val random = new Random() + sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => logInfo(s"Current batch timestamp = $timestamp") ts.toLiteral + // SPARK-24896: Set the seed for random number generation in Uuid expressions. + case _: Uuid => Uuid(Some(random.nextLong())) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 78199b0..f37f368 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -21,6 +21,8 @@ import java.{util => ju} import java.util.Optional import java.util.concurrent.CountDownLatch +import scala.collection.mutable + import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter @@ -29,8 +31,9 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter import org.apache.spark.sql.functions._ @@ -834,6 +837,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckLastBatch(("A", 1))) } + test("Uuid in streaming query should not produce same uuids in each execution") { +val uuids = mutable.ArrayBuffer[String]() +def collectUuid: Seq[Row] => Unit = { rows: Seq[Row] => + rows.foreach(r
spark git commit: [SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused
Repository: spark Updated Branches: refs/heads/master 02f967795 -> efef55388 [SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused ## What changes were proposed in this pull request? In the current master, `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes some duplicate exchange and the actual number of registered exchanges changes. Finally, the assertion in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges become different; https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201 This pr fixed the issue and the code to reproduce this is as follows; ``` scala> sql("SET spark.sql.adaptive.enabled=true") scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") scala> val df = spark.range(1).selectExpr("id AS key", "id AS value") scala> val resultDf = df.join(df, "key").join(df, "key") scala> resultDf.show ... at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 101 more Caused by: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... ``` ## How was this patch tested? Added tests in `ExchangeCoordinatorSuite`. Author: Takeshi Yamamuro Closes #21754 from maropu/SPARK-24705-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efef5538 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efef5538 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efef5538 Branch: refs/heads/master Commit: efef55388fedef3f7954a385776e666ad4597a58 Parents: 02f9677 Author: Takeshi Yamamuro Authored: Thu Aug 2 13:05:36 2018 -0700 Committer: Xiao Li Committed: Thu Aug 2 13:05:36 2018 -0700 -- .../execution/exchange/EnsureRequirements.scala | 1 - .../exchange/ExchangeCoordinator.scala | 17 ++-- .../execution/ExchangeCoordinatorSuite.scala| 21 3 files changed, 28 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index d96ecba..d2d5011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -82,7 +82,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { if (adaptiveExecutionEnabled && supportsCoordinator) { val coordinator = new ExchangeCoordinator( -children.length, targetPostShuffleInputSize, minNumPostShufflePartitions) children.zip(requiredChildDistributions).map { http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 051e610..f5d93ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -83,7 +83,6 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) */ class ExchangeCoordinator( -numExchanges: Int, advisoryTargetPostShuffleInputSize: Long, minNumPostShufflePartitions: Option[Int] = None) extends Logging { @@
spark git commit: [SPARK-23908][SQL] Add transform function.
Repository: spark Updated Branches: refs/heads/master 0df6bf882 -> 02f967795 [SPARK-23908][SQL] Add transform function. ## What changes were proposed in this pull request? This pr adds `transform` function which transforms elements in an array using the function. Optionally we can take the index of each element as the second argument. ```sql > SELECT transform(array(1, 2, 3), x -> x + 1); array(2, 3, 4) > SELECT transform(array(1, 2, 3), (x, i) -> x + i); array(1, 3, 5) ``` ## How was this patch tested? Added tests. Author: Takuya UESHIN Closes #21954 from ueshin/issues/SPARK-23908/transform. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02f96779 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02f96779 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02f96779 Branch: refs/heads/master Commit: 02f967795b7e8ccf2738d567928e47c38c1134e1 Parents: 0df6bf8 Author: Takuya UESHIN Authored: Thu Aug 2 13:00:33 2018 -0700 Committer: Xiao Li Committed: Thu Aug 2 13:00:33 2018 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 + .../spark/sql/catalyst/analysis/Analyzer.scala | 3 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../analysis/higherOrderFunctions.scala | 166 +++ .../expressions/higherOrderFunctions.scala | 212 +++ .../spark/sql/catalyst/parser/AstBuilder.scala | 10 + .../analysis/ResolveLambdaVariablesSuite.scala | 89 .../expressions/HigherOrderFunctionsSuite.scala | 97 + .../catalyst/parser/ExpressionParserSuite.scala | 5 + .../spark/sql/catalyst/plans/PlanTest.scala | 2 + .../sql-tests/inputs/higher-order-functions.sql | 26 +++ .../results/higher-order-functions.sql.out | 81 +++ .../spark/sql/DataFrameFunctionsSuite.scala | 153 + 13 files changed, 847 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/02f96779/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 2aca10f..9ad6f30 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -591,6 +591,8 @@ primaryExpression (OVER windowSpec)? #functionCall | qualifiedName '(' trimOption=(BOTH | LEADING | TRAILING) argument+=expression FROM argument+=expression ')' #functionCall +| IDENTIFIER '->' expression #lambda +| '(' IDENTIFIER (',' IDENTIFIER)+ ')' '->' expression #lambda | value=primaryExpression '[' index=valueExpression ']' #subscript | identifier #columnReference | base=primaryExpression '.' fieldName=identifier #dereference http://git-wip-us.apache.org/repos/asf/spark/blob/02f96779/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 76dc867..7f235ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -180,6 +180,8 @@ class Analyzer( ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: + ResolveHigherOrderFunctions(catalog) :: + ResolveLambdaVariables(conf) :: ResolveTimeZone(conf) :: ResolveRandomSeed :: TypeCoercion.typeCoercionRules(conf) ++ @@ -878,6 +880,7 @@ class Analyzer( } private def resolve(e: Expression, q: LogicalPlan): Expression = e match { + case f: LambdaFunction if !f.bound => f case u @ UnresolvedAttribute(nameParts) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = http://git-wip-us.apache.org/repos/asf/spark/blob/02f96779/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
svn commit: r28518 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_12_02-0df6bf8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Aug 2 19:16:55 2018 New Revision: 28518 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_02_12_02-0df6bf8 docs [This commit notification would consist of 1469 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [BUILD] Fix lint-python.
Repository: spark Updated Branches: refs/heads/master 38e4699c9 -> 0df6bf882 [BUILD] Fix lint-python. ## What changes were proposed in this pull request? This pr fixes lint-python. ``` ./python/pyspark/accumulators.py:231:9: E306 expected 1 blank line before a nested definition, found 0 ./python/pyspark/accumulators.py:257:101: E501 line too long (107 > 100 characters) ./python/pyspark/accumulators.py:264:1: E302 expected 2 blank lines, found 1 ./python/pyspark/accumulators.py:281:1: E302 expected 2 blank lines, found 1 ``` ## How was this patch tested? Executed lint-python manually. Author: Takuya UESHIN Closes #21973 from ueshin/issues/build/1/fix_lint-python. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6bf88 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6bf88 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6bf88 Branch: refs/heads/master Commit: 0df6bf882907d7d76572f513168a144067d0e0ec Parents: 38e4699 Author: Takuya UESHIN Authored: Fri Aug 3 03:18:46 2018 +0900 Committer: Takuya UESHIN Committed: Fri Aug 3 03:18:46 2018 +0900 -- python/pyspark/accumulators.py | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0df6bf88/python/pyspark/accumulators.py -- diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 1276c31..30ad042 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -228,6 +228,7 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry auth_token = self.server.auth_token + def poll(func): while not self.server.server_shutdown: # Poll every 1 second for new data -- don't block in case of shutdown. @@ -254,13 +255,15 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): # we've authenticated, we can break out of the first loop now return True else: -raise Exception("The value of the provided token to the AccumulatorServer is not correct.") +raise Exception( +"The value of the provided token to the AccumulatorServer is not correct.") # first we keep polling till we've received the authentication token poll(authenticate_and_accum_updates) # now we've authenticated, don't need to check for the token anymore poll(accum_updates) + class AccumulatorServer(SocketServer.TCPServer): def __init__(self, server_address, RequestHandlerClass, auth_token): @@ -278,6 +281,7 @@ class AccumulatorServer(SocketServer.TCPServer): SocketServer.TCPServer.shutdown(self) self.server_close() + def _start_update_server(auth_token): """Start a TCP server to receive accumulator updates in a daemon thread, and returns it""" server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler, auth_token) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern
Repository: spark Updated Branches: refs/heads/master ad2e63662 -> 38e4699c9 [SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern ## What changes were proposed in this pull request? Check on job submit to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The following patterns are not supported: - Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD); - An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). ## How was this patch tested? Add test cases in `BarrierStageOnSubmittedSuite`. Author: Xingbo Jiang Closes #21927 from jiangxb1987/SPARK-24820. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e4699c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e4699c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e4699c Branch: refs/heads/master Commit: 38e4699c978e56a0f24b8efb94fd3206cdd8b3fe Parents: ad2e636 Author: Xingbo Jiang Authored: Thu Aug 2 09:36:26 2018 -0700 Committer: Xiangrui Meng Committed: Thu Aug 2 09:36:26 2018 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 55 ++- .../spark/BarrierStageOnSubmittedSuite.scala| 153 +++ 2 files changed, 207 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38e4699c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4858af7..3dd0718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{RDD, RDDCheckpointData} +import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -341,6 +341,22 @@ class DAGScheduler( } /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/take()/PartitionPruningRDD); + * 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). + */ + private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numTasksInStage: Int): Unit = { +val predicate: RDD[_] => Boolean = (r => + r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1) +if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) { + throw new SparkException( + DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) +} + } + + /** * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a * previously run stage generated the same shuffle data, this function will copy the output * locations that are still available from the previous shuffle to avoid unnecessarily @@ -348,6 +364,7 @@ class DAGScheduler( */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd +checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -376,6 +393,7 @@ class DAGScheduler( partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { +checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) @@ -451,6 +469,32 @@ class DAGScheduler( parents } + /** + * Traverses the given RDD and its ancestors within the same stage and checks whether all of the + * RDDs satisfy a given predicate. + */ + private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = { +val visited = new HashSet[RDD[_]] +val waitingForVisit = new
spark git commit: [SPARK-24598][DOCS] State in the documentation the behavior when arithmetic operations cause overflow
Repository: spark Updated Branches: refs/heads/master 15fc23722 -> ad2e63662 [SPARK-24598][DOCS] State in the documentation the behavior when arithmetic operations cause overflow ## What changes were proposed in this pull request? According to the discussion in https://github.com/apache/spark/pull/21599, changing the behavior of arithmetic operations so that they can check for overflow is not nice in a minor release. What we can do for 2.4 is warn users about the current behavior in the documentation, so that they are aware of the issue and can take proper actions. ## How was this patch tested? NA Author: Marco Gaido Closes #21967 from mgaido91/SPARK-24598_doc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad2e6366 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad2e6366 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad2e6366 Branch: refs/heads/master Commit: ad2e63662885b67b1e94030b13fdae4f7366dc4a Parents: 15fc237 Author: Marco Gaido Authored: Thu Aug 2 09:28:13 2018 -0700 Committer: Xiao Li Committed: Thu Aug 2 09:28:13 2018 -0700 -- docs/sql-programming-guide.md | 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad2e6366/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 5f1eee8..0900f83 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -3072,3 +3072,10 @@ Specifically: - In aggregations, all NaN values are grouped together. - NaN is treated as a normal value in join keys. - NaN values go last when in ascending order, larger than any other numeric value. + + ## Arithmetic operations + +Operations performed on numeric types (with the exception of `decimal`) are not checked for overflow. +This means that in case an operation causes an overflow, the result is the same that the same operation +returns in a Java/Scala program (eg. if the sum of 2 integers is higher than the maximum value representable, +the result is a negative number). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28514 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_08_02-f04cd67-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Aug 2 15:16:27 2018 New Revision: 28514 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_02_08_02-f04cd67 docs [This commit notification would consist of 1469 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] remove dead code in ExpressionEvalHelper
Repository: spark Updated Branches: refs/heads/master d182b3d34 -> f04cd6709 [MINOR] remove dead code in ExpressionEvalHelper ## What changes were proposed in this pull request? This addresses https://github.com/apache/spark/pull/21236/files#r207078480 both https://github.com/apache/spark/pull/21236 and https://github.com/apache/spark/pull/21838 add a InternalRow result check to ExpressionEvalHelper and becomes duplicated. ## How was this patch tested? N/A Author: Wenchen Fan Closes #21958 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f04cd670 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f04cd670 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f04cd670 Branch: refs/heads/master Commit: f04cd670943d0eb6eb688a0f50d56293cda554ef Parents: d182b3d Author: Wenchen Fan Authored: Thu Aug 2 09:26:27 2018 -0500 Committer: Sean Owen Committed: Thu Aug 2 09:26:27 2018 -0500 -- .../spark/sql/catalyst/expressions/ExpressionEvalHelper.scala | 3 --- 1 file changed, 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f04cd670/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index d045267..6684e5c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -105,9 +105,6 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa if (expected.isNaN) result.isNaN else expected == result case (result: Float, expected: Float) => if (expected.isNaN) result.isNaN else expected == result - case (result: UnsafeRow, expected: GenericInternalRow) => -val structType = exprDataType.asInstanceOf[StructType] -result.toSeq(structType) == expected.toSeq(structType) case (result: Row, expected: InternalRow) => result.toSeq == expected.toSeq(result.schema) case _ => result == expected - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24742] Fix NullPointerexception in Field Metadata
Repository: spark Updated Branches: refs/heads/master 7be6fc3c7 -> d182b3d34 [SPARK-24742] Fix NullPointerexception in Field Metadata ## What changes were proposed in this pull request? This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull" ## How was this patch tested? A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala Author: Kaya Kupferschmidt Closes #21722 from kupferk/SPARK-24742. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d182b3d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d182b3d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d182b3d3 Branch: refs/heads/master Commit: d182b3d34d6afade401b8a455b774059bae9d90f Parents: 7be6fc3 Author: Kaya Kupferschmidt Authored: Thu Aug 2 22:23:24 2018 +0800 Committer: hyukjinkwon Committed: Thu Aug 2 22:23:24 2018 +0800 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24742] Fix NullPointerexception in Field Metadata
Repository: spark Updated Branches: refs/heads/master 46110a589 -> 7be6fc3c7 [SPARK-24742] Fix NullPointerexception in Field Metadata ## What changes were proposed in this pull request? This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull" ## How was this patch tested? A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala Author: Kaya Kupferschmidt Closes #21722 from kupferk/SPARK-24742. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7be6fc3c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7be6fc3c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7be6fc3c Branch: refs/heads/master Commit: 7be6fc3c77b00f0eefd276676524ec4e36bab868 Parents: 46110a5 Author: Kaya Kupferschmidt Authored: Thu Aug 2 09:22:21 2018 -0500 Committer: Sean Owen Committed: Thu Aug 2 09:22:21 2018 -0500 -- .../org/apache/spark/sql/types/Metadata.scala | 2 + .../apache/spark/sql/types/MetadataSuite.scala | 74 2 files changed, 76 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7be6fc3c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 352fb54..7c15dc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -215,6 +215,8 @@ object Metadata { x.## case x: Metadata => hash(x.map) + case null => +0 case other => throw new RuntimeException(s"Do not support type ${other.getClass}.") } http://git-wip-us.apache.org/repos/asf/spark/blob/7be6fc3c/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala new file mode 100644 index 000..210e657 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +import org.apache.spark.SparkFunSuite + +class MetadataSuite extends SparkFunSuite { + test("String Metadata") { +val meta = new MetadataBuilder().putString("key", "value").build() +assert(meta === meta) +assert(meta.## !== 0) +assert(meta.getString("key") === "value") +assert(meta.contains("key")) +intercept[NoSuchElementException](meta.getString("no_such_key")) +intercept[ClassCastException](meta.getBoolean("key")) + } + + test("Long Metadata") { +val meta = new MetadataBuilder().putLong("key", 12).build() +assert(meta === meta) +assert(meta.## !== 0) +assert(meta.getLong("key") === 12) +assert(meta.contains("key")) +intercept[NoSuchElementException](meta.getLong("no_such_key")) +intercept[ClassCastException](meta.getBoolean("key")) + } + + test("Double Metadata") { +val meta = new MetadataBuilder().putDouble("key", 12).build() +assert(meta === meta) +assert(meta.## !== 0) +assert(meta.getDouble("key") === 12) +assert(meta.contains("key")) +intercept[NoSuchElementException](meta.getDouble("no_such_key")) +intercept[ClassCastException](meta.getBoolean("key")) + } + + test("Boolean Metadata") { +val meta = new MetadataBuilder().putBoolean("key", true).build() +assert(meta === meta) +assert(meta.## !== 0) +assert(meta.getBoolean("key") === true) +assert(meta.contains("key")) +intercept[NoSuchElementException](meta.getBoolean("no_such_key")) +
spark git commit: [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalPlan Node
Repository: spark Updated Branches: refs/heads/master a65736996 -> 46110a589 [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalPlan Node ## What changes were proposed in this pull request? Remove the AnalysisBarrier LogicalPlan node, which is useless now. ## How was this patch tested? N/A Author: Xiao Li Closes #21962 from gatorsmile/refactor2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46110a58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46110a58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46110a58 Branch: refs/heads/master Commit: 46110a589f4e91cd7605c5a2c34c3db6b2635830 Parents: a657369 Author: Xiao Li Authored: Thu Aug 2 22:20:41 2018 +0800 Committer: hyukjinkwon Committed: Thu Aug 2 22:20:41 2018 +0800 -- .../plans/logical/basicLogicalOperators.scala | 20 .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../spark/sql/execution/CacheManager.scala | 2 +- 3 files changed, 2 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46110a58/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 13b5130..68413d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -924,23 +924,3 @@ case class Deduplicate( override def output: Seq[Attribute] = child.output } - -/** - * A logical plan for setting a barrier of analysis. - * - * The SQL Analyzer goes through a whole query plan even most part of it is analyzed. This - * increases the time spent on query analysis for long pipelines in ML, especially. - * - * This logical plan wraps an analyzed logical plan to prevent it from analysis again. The barrier - * is applied to the analyzed logical plan in Dataset. It won't change the output of wrapped - * logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset - * will be put on the barrier, so only the new nodes created will be analyzed. - * - * This analysis barrier will be removed at the end of analysis stage. - */ -case class AnalysisBarrier(child: LogicalPlan) extends LeafNode { - override protected def innerChildren: Seq[LogicalPlan] = Seq(child) - override def output: Seq[Attribute] = child.output - override def isStreaming: Boolean = child.isStreaming - override def doCanonicalize(): LogicalPlan = child.canonicalized -} http://git-wip-us.apache.org/repos/asf/spark/blob/46110a58/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3c9e743..cd7dc2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} http://git-wip-us.apache.org/repos/asf/spark/blob/46110a58/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index ed130dc..c992993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} import
spark git commit: [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support
Repository: spark Updated Branches: refs/heads/master 275415777 -> a65736996 [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support ## What changes were proposed in this pull request? This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk). * We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2). * We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3). Note: regarding issue 1 in the doc the plan is: > Do Nothing. Donât try to fix this as this is only a problem for Java users > who would want to use 2.11 binaries. In that case they can cast to > MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be > simplified so that this issue is removed. ## How was this patch tested? This was manually tested: ```./dev/change-scala-version.sh 2.12 ./build/mvn -DskipTests -Pscala-2.12 clean package ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None``` Author: Stavros Kontopoulos Closes #21930 from skonto/scala2.12-sup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6573699 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6573699 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6573699 Branch: refs/heads/master Commit: a65736996b2b506f61cc8d599ec9f4c52a1b5312 Parents: 2754157 Author: Stavros Kontopoulos Authored: Thu Aug 2 09:17:09 2018 -0500 Committer: Sean Owen Committed: Thu Aug 2 09:17:09 2018 -0500 -- .../scala/org/apache/spark/TaskContext.scala| 5 +- .../apache/spark/api/python/PythonRunner.scala | 2 +- .../spark/broadcast/TorrentBroadcast.scala | 2 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../scala/org/apache/spark/rdd/JdbcRDD.scala| 2 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 2 +- .../spark/rdd/ReliableCheckpointRDD.scala | 2 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 2 +- .../storage/ShuffleBlockFetcherIterator.scala | 2 +- .../spark/storage/memory/MemoryStore.scala | 2 +- .../org/apache/spark/util/ClosureCleaner.scala | 278 --- .../util/collection/ExternalAppendOnlyMap.scala | 2 +- .../apache/spark/util/ClosureCleanerSuite.scala | 3 + .../spark/util/ClosureCleanerSuite2.scala | 53 +++- .../apache/spark/sql/avro/AvroFileFormat.scala | 2 +- .../spark/sql/kafka010/KafkaSourceRDD.scala | 2 +- .../spark/streaming/kafka010/KafkaRDD.scala | 2 +- .../spark/ml/source/libsvm/LibSVMRelation.scala | 2 +- .../aggregate/TungstenAggregationIterator.scala | 2 +- .../sql/execution/arrow/ArrowConverters.scala | 4 +- .../columnar/InMemoryTableScanExec.scala| 2 +- .../execution/datasources/CodecStreams.scala| 2 +- .../sql/execution/datasources/FileScanRDD.scala | 2 +- .../datasources/csv/CSVDataSource.scala | 2 +- .../execution/datasources/jdbc/JDBCRDD.scala| 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../datasources/orc/OrcFileFormat.scala | 4 +- .../datasources/parquet/ParquetFileFormat.scala | 4 +- .../datasources/text/TextFileFormat.scala | 2 +- .../datasources/v2/DataSourceRDD.scala | 2 +- .../spark/sql/execution/joins/HashJoin.scala| 2 +- .../execution/joins/ShuffledHashJoinExec.scala | 2 +- .../python/AggregateInPandasExec.scala | 2 +- .../execution/python/ArrowPythonRunner.scala| 2 +- .../sql/execution/python/EvalPythonExec.scala | 2 +- .../execution/python/PythonForeachWriter.scala | 2 +- .../execution/python/WindowInPandasExec.scala | 2 +- .../continuous/ContinuousCoalesceRDD.scala | 5 +- .../continuous/ContinuousQueuedDataReader.scala | 2 +-
spark git commit: [SPARK-24795][CORE][FOLLOWUP] Kill all running tasks when a task in a barrier stage fail
Repository: spark Updated Branches: refs/heads/master 57d994994 -> 275415777 [SPARK-24795][CORE][FOLLOWUP] Kill all running tasks when a task in a barrier stage fail ## What changes were proposed in this pull request? Kill all running tasks when a task in a barrier stage fail in the middle. `TaskScheduler`.`cancelTasks()` will also fail the job, so we implemented a new method `killAllTaskAttempts()` to just kill all running tasks of a stage without cancel the stage/job. ## How was this patch tested? Add new test cases in `TaskSchedulerImplSuite`. Author: Xingbo Jiang Closes #21943 from jiangxb1987/killAllTasks. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27541577 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27541577 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27541577 Branch: refs/heads/master Commit: 275415777b84b82aa5409e6577e1efaff1d989e7 Parents: 57d9949 Author: Xingbo Jiang Authored: Thu Aug 2 20:54:36 2018 +0800 Committer: Wenchen Fan Committed: Thu Aug 2 20:54:36 2018 +0800 -- .../apache/spark/scheduler/DAGScheduler.scala | 14 +++-- .../apache/spark/scheduler/TaskScheduler.scala | 8 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 34 +++ .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++ .../scheduler/ExternalClusterManagerSuite.scala | 2 + .../scheduler/TaskSchedulerImplSuite.scala | 62 6 files changed, 109 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27541577/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 003d64f..4858af7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1433,17 +1433,18 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + "failed.") -val message = s"Stage failed because barrier task $task finished unsuccessfully. " + +val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + failure.toErrorString try { - // cancelTasks will fail if a SchedulerBackend does not implement killTask - taskScheduler.cancelTasks(stageId, interruptThread = false) + // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask. + val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) failed." + taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason) } catch { case e: UnsupportedOperationException => // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. // TODO SPARK-24877 leave the zombie tasks and ignore their completion events. -logWarning(s"Could not cancel tasks for stage $stageId", e) -abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + +logWarning(s"Could not kill all tasks for stage $stageId", e) +abortStage(failedStage, "Could not kill zombie barrier tasks for stage " + s"$failedStage (${failedStage.name})", Some(e)) } markStageAsFinished(failedStage, Some(message)) @@ -1457,7 +1458,8 @@ class DAGScheduler( if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { -"Barrier stage will not retry stage due to testing config" +"Barrier stage will not retry stage due to testing config. Most recent failure " + + s"reason: $message" } else { s"""$failedStage (${failedStage.name}) |has failed the maximum allowable number of http://git-wip-us.apache.org/repos/asf/spark/blob/27541577/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 90644fe..95f7ae4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -51,16 +51,22 @@ private[spark] trait TaskScheduler { // Submit a sequence of tasks to run. def submitTasks(taskSet: TaskSet): Unit - // Cancel a stage. + // Kill
svn commit: r28508 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_02_00_02-57d9949-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Aug 2 07:16:48 2018 New Revision: 28508 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_02_00_02-57d9949 docs [This commit notification would consist of 1469 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24557][ML] ClusteringEvaluator support array input
Repository: spark Updated Branches: refs/heads/master 166f34618 -> 57d994994 [SPARK-24557][ML] ClusteringEvaluator support array input ## What changes were proposed in this pull request? ClusteringEvaluator support array input ## How was this patch tested? added tests Author: zhengruifeng Closes #21563 from zhengruifeng/clu_eval_support_array. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57d99499 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57d99499 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57d99499 Branch: refs/heads/master Commit: 57d994994d27154f57f2724924c42beb2ab2e0e7 Parents: 166f346 Author: zhengruifeng Authored: Wed Aug 1 23:46:01 2018 -0700 Committer: Xiangrui Meng Committed: Wed Aug 1 23:46:01 2018 -0700 -- .../spark/ml/evaluation/ClusteringEvaluator.scala| 15 +-- .../ml/evaluation/ClusteringEvaluatorSuite.scala | 15 ++- 2 files changed, 23 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/57d99499/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index 4353c46..a6d6b4e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -21,11 +21,10 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.attribute.AttributeGroup -import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} -import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, - SchemaUtils} +import org.apache.spark.ml.util._ import org.apache.spark.sql.{Column, DataFrame, Dataset} import org.apache.spark.sql.functions.{avg, col, udf} import org.apache.spark.sql.types.DoubleType @@ -107,15 +106,19 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str @Since("2.3.0") override def evaluate(dataset: Dataset[_]): Double = { -SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) +SchemaUtils.validateVectorCompatibleColumn(dataset.schema, $(featuresCol)) SchemaUtils.checkNumericType(dataset.schema, $(predictionCol)) +val vectorCol = DatasetUtils.columnToVector(dataset, $(featuresCol)) +val df = dataset.select(col($(predictionCol)), + vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata)) + ($(metricName), $(distanceMeasure)) match { case ("silhouette", "squaredEuclidean") => SquaredEuclideanSilhouette.computeSilhouetteScore( - dataset, $(predictionCol), $(featuresCol)) + df, $(predictionCol), $(featuresCol)) case ("silhouette", "cosine") => -CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), $(featuresCol)) +CosineSilhouette.computeSilhouetteScore(df, $(predictionCol), $(featuresCol)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/57d99499/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 2c175ff..e2d7756 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Dataset @@ -33,10 +33,17 @@ class ClusteringEvaluatorSuite import testImplicits._ @transient var irisDataset: Dataset[_] = _ + @transient var newIrisDataset: Dataset[_] = _ +
spark git commit: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
Repository: spark Updated Branches: refs/heads/master c9914cf04 -> 166f34618 [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE ## What changes were proposed in this pull request? This PR is to refactor the code in AVERAGE by dsl. ## How was this patch tested? N/A Author: Xiao Li Closes #21951 from gatorsmile/refactor1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/166f3461 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/166f3461 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/166f3461 Branch: refs/heads/master Commit: 166f346185cc0b27a7e2b2a3b42df277e5901f2f Parents: c9914cf Author: Xiao Li Authored: Wed Aug 1 23:00:17 2018 -0700 Committer: Xiao Li Committed: Wed Aug 1 23:00:17 2018 -0700 -- .../scala/org/apache/spark/sql/catalyst/dsl/package.scala | 1 + .../sql/catalyst/expressions/aggregate/Average.scala | 10 -- 2 files changed, 5 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/166f3461/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 89e8c99..9870854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -166,6 +166,7 @@ package object dsl { def maxDistinct(e: Expression): Expression = Max(e).toAggregateExpression(isDistinct = true) def upper(e: Expression): Expression = Upper(e) def lower(e: Expression): Expression = Lower(e) +def coalesce(args: Expression*): Expression = Coalesce(args) def sqrt(e: Expression): Expression = Sqrt(e) def abs(e: Expression): Expression = Abs(e) def star(names: String*): Expression = names match { http://git-wip-us.apache.org/repos/asf/spark/blob/166f3461/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala index 9ccf5aa..f1fad77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala @@ -46,7 +46,7 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate { override lazy val aggBufferAttributes = sum :: count :: Nil override lazy val initialValues = Seq( -/* sum = */ Cast(Literal(0), sumDataType), +/* sum = */ Literal(0).cast(sumDataType), /* count = */ Literal(0L) ) @@ -58,18 +58,16 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate { // If all input are nulls, count will be 0 and we will get null after the division. override lazy val evaluateExpression = child.dataType match { case _: DecimalType => - Cast( -DecimalPrecision.decimalAndDecimal(sum / Cast(count, DecimalType.LongDecimal)), -resultType) + DecimalPrecision.decimalAndDecimal(sum / count.cast(DecimalType.LongDecimal)).cast(resultType) case _ => - Cast(sum, resultType) / Cast(count, resultType) + sum.cast(resultType) / count.cast(resultType) } protected def updateExpressionsDef: Seq[Expression] = Seq( /* sum = */ Add( sum, - Coalesce(Cast(child, sumDataType) :: Cast(Literal(0), sumDataType) :: Nil)), + coalesce(child.cast(sumDataType), Literal(0).cast(sumDataType))), /* count = */ If(IsNull(child), count, count + 1L) ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org