[GitHub] spark pull request #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...
Github user henryr closed the pull request at: https://github.com/apache/spark/pull/22211 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/22211 Merged to 2.1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/22211 [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.⦠## What changes were proposed in this pull request? Back port of #20393 and #22079. Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined. The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below: upstream stage -> repartition stage -> result stage (-> indicate a shuffle) When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data. The following code returns 931532, instead of 100: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() ``` In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too. The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression. This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly. Add unit test in ExchangeSuite. With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 100: ``` import scala.sys.process._ import org.apache.spark.TaskContext spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true") val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() res7: Long = 100 ``` Author: Xingbo Jiang You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-23207-branch-2.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22211 commit 5bdd9e353287c2a326138b25f80ee255d15942b0 Author: Xingbo Jiang Date: 2018-08-23T21:22:56Z [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] Shuffle+Repartition on a DataFrame could lead to incorrect answers ## What changes were proposed in this pull request? Back port of #20393 and #22079. Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined. The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below: upstream stage -> repartition stage -> result stage (-> indicate a shuffle) When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data. The following code returns 931532, instead of 100: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() ``` In thi
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21482 Any further comments here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r197234189 --- Diff: python/pyspark/sql/functions.py --- @@ -468,6 +468,18 @@ def input_file_name(): return Column(sc._jvm.functions.input_file_name()) +@since(2.4) +def isinf(col): --- End diff -- @HyukjinKwon could you clarify, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21482 I think consistency in Spark's naming convention (and therefore increased discoverability by users) outweighs the advantage of naming it exactly for the Impala equivalent. I do agree that multiple aliases probably aren't worth the trouble at this point. FWIW, I would have used this function if it had been available recently. So it's not just hypothetical. And to me this provides some symmetry for support for 'special' float values, since we already have isnan(). On 7 June 2018 at 15:36, Reynold Xin wrote: > Thanks, Henry. In general I'm not a huge fan of adding something because > hypothetically somebody might want it. Also if you want this to be > compatible with Impala, wouldn't you want to name this the same way as > Impala? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21482#issuecomment-395587233>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAFc1dxwI0XBXVvtW7qXKOu3BmbuXEV0ks5t6asDgaJpZM4UXVuP> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r193898812 --- Diff: R/pkg/NAMESPACE --- @@ -281,6 +281,8 @@ exportMethods("%<=>%", "initcap", "input_file_name", "instr", + "isInf", + "isinf", --- End diff -- Do you know if there's any consistency behind the different capitalization schemes? There's `format_number`, `isnan` and `isNotNull` here, for example. If not, how about we just go with `isInf` for now and if other aliases are needed in the future they can be added and discussed then? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21482 @rxin, that in itself is a bit weird, but there are ways to express inf values in Scala and thus inf values can show up flowing through Spark plans. I'm not sure MySQL has any such facility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21482 @rxin Other engines are all over the place: * MySQL doesn't have support for infinity (based on my cursory look) - 1.0 / 0.0 is written as `null`. Also seems to be true of SQLite. * Postgres has type-specific literals (e.g. `FLOAT8 '+Infinity'`) which you can use for comparison checks. * Impala has `is_inf()` * Oracle has a literal value, and also a built-in predicate `SELECT f FROM foo WHERE f is infinite` * SQL Server does not appear to support infinity (but that's based on anecdotal evidence) One of my motivations suggesting this builtin was to make sharing workloads with Impala easier. I think it's convenient to have as well, since it's more intuitive than figuring out what the right literal incantation should be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r192522027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala --- @@ -199,6 +199,50 @@ case class Nvl2(expr1: Expression, expr2: Expression, expr3: Expression, child: override def sql: String = s"$prettyName(${expr1.sql}, ${expr2.sql}, ${expr3.sql})" } +/** + * Evaluates to `true` iff it's Infinity. + */ +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns True evaluates to infinite else returns False ", --- End diff -- "True evaluates" -> "True if expr evaluates" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r192520713 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala --- @@ -56,6 +56,16 @@ class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { assert(ex.contains("Null value appeared in non-nullable field")) } + test("IsInf") { +checkEvaluation(IsInf(Literal(Double.PositiveInfinity)), true) +checkEvaluation(IsInf(Literal(Double.NegativeInfinity)), true) +checkEvaluation(IsInf(Literal(Float.PositiveInfinity)), true) +checkEvaluation(IsInf(Literal(Float.NegativeInfinity)), true) +checkEvaluation(IsInf(Literal.create(null, DoubleType)), false) +checkEvaluation(IsInf(Literal(Float.MaxValue)), false) +checkEvaluation(IsInf(Literal(5.5f)), false) --- End diff -- check NaN as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r192521881 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala --- @@ -199,6 +199,50 @@ case class Nvl2(expr1: Expression, expr2: Expression, expr3: Expression, child: override def sql: String = s"$prettyName(${expr1.sql}, ${expr2.sql}, ${expr3.sql})" } +/** + * Evaluates to `true` iff it's Infinity. + */ +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns True evaluates to infinite else returns False ", + examples = """ +Examples: + > SELECT _FUNC_(1/0); + True + > SELECT _FUNC_(5); + False + """) +case class IsInf(child: Expression) extends UnaryExpression + with Predicate with ImplicitCastInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(DoubleType, FloatType)) + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { +val value = child.eval(input) +if (value == null) { + false +} else { + child.dataType match { +case DoubleType => value.asInstanceOf[Double].isInfinity +case FloatType => value.asInstanceOf[Float].isInfinity + } +} + } + + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val eval = child.genCode(ctx) +child.dataType match { + case DoubleType | FloatType => +ev.copy(code = code""" + ${eval.code} + ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + ${ev.value} = !${eval.isNull} && Double.isInfinite(${eval.value});""", --- End diff -- out of interest, why use `Double.isInfinite` here, but `value.isInfinity` in the non-codegen version? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r192520834 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -1107,6 +1107,14 @@ object functions { */ def input_file_name(): Column = withExpr { InputFileName() } + /** + * Return true iff the column is Infinity. + * + * @group normal_funcs + * @since 1.6.0 --- End diff -- Need to fix these versions, here and elsewhere. This change would land in Spark 2.4.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r192520566 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala --- @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} import org.apache.spark.sql.types._ -class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { + class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { --- End diff -- Revert this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3
Github user henryr closed the pull request at: https://github.com/apache/spark/pull/21302 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21302 Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21302#discussion_r188042296 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -602,6 +602,16 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-23852: Broken Parquet push-down for partially-written stats") { +// parquet-1217.parquet contains a single column with values -1, 0, 1, 2 and null. +// The row-group statistics include null counts, but not min and max values, which +// triggers PARQUET-1217. +val df = readResourceParquetFile("test-data/parquet-1217.parquet") --- End diff -- PR for master is https://github.com/apache/spark/pull/21323. My guess is there's no reason to block this backport and 2.3.1 by waiting for it to land, but happy to do whatever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21323: [SPARK-23582][SQL] Add withSQLConf(...) to test c...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/21323 [SPARK-23582][SQL] Add withSQLConf(...) to test case ## What changes were proposed in this pull request? Add a `withSQLConf(...)` wrapper to force Parquet filter pushdown for a test that relies on it. ## How was this patch tested? Test passes You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-23582 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21323 commit fdcacd8868de0aca3d13ae5ca5a9e323f114fab9 Author: Henry Robinson <henry@...> Date: 2018-05-14T17:48:22Z [SPARK-23582][SQL] Add withSQLConf(...) to test case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21302 Sounds good, done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/21302 [SPARK-23852][SQL] Upgrade to Parquet 1.8.3 ## What changes were proposed in this pull request? Upgrade Parquet dependency to 1.8.3 to avoid PARQUET-1217 ## How was this patch tested? Ran testcase from SPARK-23852 (will backport in a separate PR after this goes in). You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark branch-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21302 commit 35e214995201d6b3a9a013d0f8d2106b084f4de9 Author: Henry Robinson <henry@...> Date: 2018-05-11T18:50:26Z [SPARK-23852][SQL] Upgrade to Parquet 1.8.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21049 @dilipbiswal I tried hard to find something in the SQL standard that clarified the situation, but couldn't (of course that could be because the standard is pretty hard to parse... :)). So let's go with the idea of not dropping `ORDER BY` in inline views, but we can safely drop them in scalar subqueries and nested subqueries. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21284: [SPARK-23852][SQL] Add test that fails if PARQUET...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/21284 [SPARK-23852][SQL] Add test that fails if PARQUET-1217 is not fixed ## What changes were proposed in this pull request? Add a new test that triggers if PARQUET-1217 - a predicate pushdown bug - is not fixed in Spark's Parquet dependency. ## How was this patch tested? New unit test passes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-23852 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21284 commit 4b42ee2c54a2f5488f6cc4e65ce5a401c16a6d8b Author: Henry Robinson <henry@...> Date: 2018-04-12T18:53:13Z [SPARK-23852][SQL] Add test that fails if PARQUET-1217 is not fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21201 Any chance to get this merged now the tests are working again? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21049 I might be a bit of a hardliner on this, but I think it's correct to eliminate the {{ORDER BY}} from common table expressions (e.g. MSSQL agrees with me, see [this link](https://docs.microsoft.com/en-us/sql/t-sql/queries/with-common-table-expression-transact-sql?view=sql-server-2017#guidelines-for-creating-and-using-common-table-expressions)). However, given the principle of least surprise, I agree it might be a good idea to at least start with scalar and nested subqueries, and leave inline views for another day. That might be a bit harder to do (I think the rule will need a whitelist of operators it's ok to eliminate sorts below), and in general I think there'll be some missed opportunities, but it's a start :) Alternatively we could extend the analyzed logical plan to explicitly mark the different subquery types (i.e. have a `InlineView` node, a `NestedSubquery` node and so on). That would make these optimizations easier to express, but I have some reservations about the semantics of introducing those nodes. What do you think @dilipbiswal / @gatorsmile ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21145#discussion_r186143060 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java --- @@ -22,20 +22,20 @@ import org.apache.spark.annotation.InterfaceStability; /** - * A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is + * A read task returned by {@link DataSourceReader#createReadTasks()} and is --- End diff -- Ok - how about `ReadTaskDescriptor`? (In that case I think it would be ok to leave the method name as `createReadTasks()`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21145 I don't mind `ReadTask`. It's imperfect because 'task' implies that this is a thing that can be executed, whereas this interface doesn't have a way to pass control to the task object. It's more like a description of a read task, but I think `ReadTaskDescriptor` would be a bit verbose. I certainly agree that this is not a factory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21201 No problem! It's a small usability fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21201 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21201: [SPARK-24128][SQL] Mention configuration option i...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/21201 [SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error ## What changes were proposed in this pull request? Mention `spark.sql.crossJoin.enabled` in error message when an implicit `CROSS JOIN` is detected. ## How was this patch tested? `CartesianProductSuite` and `JoinSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-24128 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21201 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21049 @dilipbiswal thanks for the clarification. I agree that this particular case - where the alias is the root of a logical plan - might need special handling. Is there any reason to actually use an alias at the root of a plan like this (outside of composing with other plans, where this optimization would apply)? My suggestion would be, since there are no references to the name the alias introduces, to consider just dropping the alias node during optimization (and then the sort would not get dropped). It does seem to be an edge case though - no matter how we handle unreferred-to aliases, the optimization seems to be appropriate for the general case where aliases do correspond to subqueries. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to be an in...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21122 cc @rdblue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21070 yes, thanks @maropu! +1 to the idea of making this a jenkins job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21073 @gatorsmile this looks ready for your review (asking because you filed the JIRA) if you time, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21070 Ok, thanks for the context. I've worked on other projects where it's sometimes ok to take a small risk of a perf hit on trunk as long as the community is committed to addressing the issues before the next release. If the norms here are not to absorb any risk on trunk, that also seems reasonable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21070 @cloud-fan since thereâs probably quite some time before this lands in a release, what do you think about merging this now if itâs ready, and filing the perf jira as a blocker against 2.4? My guess is that Spark will want to move to 1.10 at some point no matter what, so doing it this way has the advantage of giving plenty of time for any other issues to shake out before the next release. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183560436 --- Diff: python/pyspark/sql/functions.py --- @@ -2186,6 +2186,29 @@ def map_values(col): return Column(sc._jvm.functions.map_values(_to_java_column(col))) +@since(2.4) +def map_concat(*cols): +"""Returns the union of all the given maps. If a key is found in multiple given maps, +that key's value in the resulting map comes from the last one of those maps. + +:param cols: list of column names (string) or list of :class:`Column` expressions + +>>> from pyspark.sql.functions import map_concat +>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c', 1, 'd') as map2") +>>> df.select(map_concat("map1", "map2").alias("map3")).show(truncate=False) +++ +|map3| +++ +|[1 -> d, 2 -> b, 3 -> c]| +++ +""" +sc = SparkContext._active_spark_context +if len(cols) == 1 and isinstance(cols[0], (list, set)): +cols = cols[0] --- End diff -- what's this for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183558371 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -116,6 +118,154 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """) +case class MapConcat(children: Seq[Expression]) extends Expression + with CodegenFallback { + + override def checkInputDataTypes(): TypeCheckResult = { +// this check currently does not allow valueContainsNull to vary, +// and unfortunately none of the MapType toString methods include +// valueContainsNull for the error message +if (children.size < 2) { + TypeCheckResult.TypeCheckFailure( +s"$prettyName expects at least two input maps.") +} else if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: MapType = { +children.headOption.map(_.dataType.asInstanceOf[MapType]) + .getOrElse(MapType(keyType = StringType, valueType = StringType)) + } + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { +val union = new util.LinkedHashMap[Any, Any]() +children.map(_.eval(input)).foreach { raw => + if (raw == null) { +return null + } + val map = raw.asInstanceOf[MapData] + map.foreach(dataType.keyType, dataType.valueType, (k, v) => +union.put(k, v) + ) +} +val (keyArray, valueArray) = union.entrySet().toArray().map { e => + val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]] + (e2.getKey, e2.getValue) +}.unzip +new ArrayBasedMapData(new GenericArrayData(keyArray), new GenericArrayData(valueArray)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val mapCodes = children.map(c => c.genCode(ctx)) +val keyType = children.head.dataType.asInstanceOf[MapType].keyType +val valueType = children.head.dataType.asInstanceOf[MapType].valueType +val mapRefArrayName = ctx.freshName("mapRefArray") +val unionMapName = ctx.freshName("union") + +val mapDataClass = classOf[MapData].getName +val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName +val arrayDataClass = classOf[ArrayData].getName +val genericArrayDataClass = classOf[GenericArrayData].getName +val hashMapClass = classOf[util.LinkedHashMap[Any, Any]].getName +val entryClass = classOf[util.Map.Entry[Any, Any]].getName + +val init = + s""" +|$mapDataClass[] $mapRefArrayName = new $mapDataClass[${mapCodes.size}]; +|boolean ${ev.isNull} = false; +|$mapDataClass ${ev.value} = null; + """.stripMargin + +val assignments = mapCodes.zipWithIndex.map { case (m, i) => + val initCode = mapCodes(i).code + val valueVarName = mapCodes(i).value.code + s""" + |$initCode + |$mapRefArrayName[$i] = $valueVarName; + |if ($valueVarName == null) { + | ${ev.isNull} = true; + |} + """.stripMargin +}.mkString("\n") + +val index1Name = ctx.freshName("idx1") +val index2Name = ctx.freshName("idx2") +val mapDataName = ctx.freshName("m") +val kaName = ctx.freshName("ka") +val vaName = ctx.freshName("va") +val keyName = ctx.freshName("key") +val valueName = ctx.freshName("value") +val isNullCheckName = ctx.freshName("isNull") --- End diff -- unused? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183559826 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -376,6 +376,35 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ) } + test("map_concat function") { +val df1 = Seq( + (Map[Int, Int](1 -> 100, 2 -> 200), Map[Int, Int](3 -> 300, 4 -> 400)), + (Map[Int, Int](1 -> 100, 2 -> 200), Map[Int, Int](3 -> 300, 1 -> 400)), + (null, Map[Int, Int](3 -> 300, 4 -> 400)) +).toDF("map1", "map2") +checkAnswer( + df1.selectExpr("map_concat(map1, map2)"), + Seq( +Row(Map(1 -> 100, 2 -> 200, 3 -> 300, 4 -> 400)), +Row(Map(1 -> 400, 2 -> 200, 3 -> 300)), +Row(null) + ) +) + +val df2 = Seq( + (Map[Int, Int](1 -> 100, 2 -> 200), Map[String, Int]("3" -> 300, "4" -> 400)) +).toDF("map1", "map2") +assert(intercept[AnalysisException] { + df2.selectExpr("map_concat(map1, map2)").collect() +}.getMessage().contains("input maps of function map_concat should all be the same type")) +assert(intercept[AnalysisException] { --- End diff -- can you put a blank line between tests? makes it a bit easier to see the separation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183559190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -116,6 +118,154 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """) +case class MapConcat(children: Seq[Expression]) extends Expression + with CodegenFallback { + + override def checkInputDataTypes(): TypeCheckResult = { +// this check currently does not allow valueContainsNull to vary, +// and unfortunately none of the MapType toString methods include +// valueContainsNull for the error message +if (children.size < 2) { + TypeCheckResult.TypeCheckFailure( +s"$prettyName expects at least two input maps.") +} else if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: MapType = { +children.headOption.map(_.dataType.asInstanceOf[MapType]) + .getOrElse(MapType(keyType = StringType, valueType = StringType)) + } + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { +val union = new util.LinkedHashMap[Any, Any]() +children.map(_.eval(input)).foreach { raw => + if (raw == null) { +return null + } + val map = raw.asInstanceOf[MapData] + map.foreach(dataType.keyType, dataType.valueType, (k, v) => +union.put(k, v) + ) +} +val (keyArray, valueArray) = union.entrySet().toArray().map { e => + val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]] + (e2.getKey, e2.getValue) +}.unzip +new ArrayBasedMapData(new GenericArrayData(keyArray), new GenericArrayData(valueArray)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val mapCodes = children.map(c => c.genCode(ctx)) +val keyType = children.head.dataType.asInstanceOf[MapType].keyType +val valueType = children.head.dataType.asInstanceOf[MapType].valueType +val mapRefArrayName = ctx.freshName("mapRefArray") +val unionMapName = ctx.freshName("union") + +val mapDataClass = classOf[MapData].getName +val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName +val arrayDataClass = classOf[ArrayData].getName +val genericArrayDataClass = classOf[GenericArrayData].getName +val hashMapClass = classOf[util.LinkedHashMap[Any, Any]].getName +val entryClass = classOf[util.Map.Entry[Any, Any]].getName + +val init = + s""" +|$mapDataClass[] $mapRefArrayName = new $mapDataClass[${mapCodes.size}]; +|boolean ${ev.isNull} = false; +|$mapDataClass ${ev.value} = null; + """.stripMargin + +val assignments = mapCodes.zipWithIndex.map { case (m, i) => + val initCode = mapCodes(i).code + val valueVarName = mapCodes(i).value.code + s""" + |$initCode + |$mapRefArrayName[$i] = $valueVarName; + |if ($valueVarName == null) { + | ${ev.isNull} = true; + |} + """.stripMargin +}.mkString("\n") + +val index1Name = ctx.freshName("idx1") +val index2Name = ctx.freshName("idx2") +val mapDataName = ctx.freshName("m") +val kaName = ctx.freshName("ka") +val vaName = ctx.freshName("va") +val keyNam
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183560201 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -116,6 +118,154 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """) +case class MapConcat(children: Seq[Expression]) extends Expression + with CodegenFallback { + + override def checkInputDataTypes(): TypeCheckResult = { +// this check currently does not allow valueContainsNull to vary, +// and unfortunately none of the MapType toString methods include +// valueContainsNull for the error message +if (children.size < 2) { + TypeCheckResult.TypeCheckFailure( +s"$prettyName expects at least two input maps.") +} else if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: MapType = { +children.headOption.map(_.dataType.asInstanceOf[MapType]) + .getOrElse(MapType(keyType = StringType, valueType = StringType)) + } + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { +val union = new util.LinkedHashMap[Any, Any]() +children.map(_.eval(input)).foreach { raw => + if (raw == null) { +return null + } + val map = raw.asInstanceOf[MapData] + map.foreach(dataType.keyType, dataType.valueType, (k, v) => +union.put(k, v) + ) +} +val (keyArray, valueArray) = union.entrySet().toArray().map { e => + val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]] + (e2.getKey, e2.getValue) +}.unzip +new ArrayBasedMapData(new GenericArrayData(keyArray), new GenericArrayData(valueArray)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val mapCodes = children.map(c => c.genCode(ctx)) --- End diff -- FWIW, I don't really feel strongly either way here. The codegen method isn't so large as to be hard to understand yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183559429 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -116,6 +118,154 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """) +case class MapConcat(children: Seq[Expression]) extends Expression + with CodegenFallback { + + override def checkInputDataTypes(): TypeCheckResult = { +// this check currently does not allow valueContainsNull to vary, +// and unfortunately none of the MapType toString methods include +// valueContainsNull for the error message +if (children.size < 2) { + TypeCheckResult.TypeCheckFailure( +s"$prettyName expects at least two input maps.") +} else if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: MapType = { +children.headOption.map(_.dataType.asInstanceOf[MapType]) + .getOrElse(MapType(keyType = StringType, valueType = StringType)) + } + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { +val union = new util.LinkedHashMap[Any, Any]() +children.map(_.eval(input)).foreach { raw => + if (raw == null) { +return null + } + val map = raw.asInstanceOf[MapData] + map.foreach(dataType.keyType, dataType.valueType, (k, v) => +union.put(k, v) + ) +} +val (keyArray, valueArray) = union.entrySet().toArray().map { e => + val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]] + (e2.getKey, e2.getValue) +}.unzip +new ArrayBasedMapData(new GenericArrayData(keyArray), new GenericArrayData(valueArray)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val mapCodes = children.map(c => c.genCode(ctx)) +val keyType = children.head.dataType.asInstanceOf[MapType].keyType +val valueType = children.head.dataType.asInstanceOf[MapType].valueType +val mapRefArrayName = ctx.freshName("mapRefArray") +val unionMapName = ctx.freshName("union") + +val mapDataClass = classOf[MapData].getName +val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName +val arrayDataClass = classOf[ArrayData].getName +val genericArrayDataClass = classOf[GenericArrayData].getName +val hashMapClass = classOf[util.LinkedHashMap[Any, Any]].getName +val entryClass = classOf[util.Map.Entry[Any, Any]].getName + +val init = + s""" +|$mapDataClass[] $mapRefArrayName = new $mapDataClass[${mapCodes.size}]; +|boolean ${ev.isNull} = false; +|$mapDataClass ${ev.value} = null; + """.stripMargin + +val assignments = mapCodes.zipWithIndex.map { case (m, i) => + val initCode = mapCodes(i).code + val valueVarName = mapCodes(i).value.code + s""" + |$initCode + |$mapRefArrayName[$i] = $valueVarName; + |if ($valueVarName == null) { + | ${ev.isNull} = true; + |} + """.stripMargin +}.mkString("\n") + +val index1Name = ctx.freshName("idx1") +val index2Name = ctx.freshName("idx2") +val mapDataName = ctx.freshName("m") +val kaName = ctx.freshName("ka") +val vaName = ctx.freshName("va") +val keyNam
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r183559663 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -56,6 +58,26 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(MapValues(m2), null) } + test("Map Concat") { +val m0 = Literal.create(Map("a" -> "1", "b" -> "2"), MapType(StringType, StringType)) +val m1 = Literal.create(Map("c" -> "3", "a" -> "4"), MapType(StringType, StringType)) +val m2 = Literal.create(Map("d" -> "4", "e" -> "5"), MapType(StringType, StringType)) +val mNull = Literal.create(null, MapType(StringType, StringType)) + +// overlapping maps +checkEvaluation(MapConcat(Seq(m0, m1)), + mutable.LinkedHashMap("a" -> "4", "b" -> "2", "c" -> "3")) +// maps with no overlap +checkEvaluation(MapConcat(Seq(m0, m2)), + mutable.LinkedHashMap("a" -> "1", "b" -> "2", "d" -> "4", "e" -> "5")) +// 3 maps +checkEvaluation(MapConcat(Seq(m0, m1, m2)), + mutable.LinkedHashMap("a" -> "4", "b" -> "2", "c" -> "3", "d" -> "4", "e" -> "5")) +// null map +checkEvaluation(MapConcat(Seq(m0, mNull)), --- End diff -- good idea to check `Seq(mNull, m0)` as well in case there's any asymmetry in the way the first argument is handled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21049 @dilipbiswal Thanks! Although Spark doesn't necessarily parse the query in the `from` clause as a subquery, is it fair to say it plans it as one? (Since the planner puts the alias under a `SubqueryAlias` node). The optimization of removing the sorts seems valid to me in any case, since neither an alias nor a subquery should be sorting except in the presence of a `limit`. Or do you think that this optimization should only be applied to subqueries that aren't in the `from` clause? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21070 This looks pretty good to me - are there any committers that can give it a (hopefully) final review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r182547477 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -115,6 +116,62 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """) +case class MapConcat(children: Seq[Expression]) extends Expression + with CodegenFallback { + + override def checkInputDataTypes(): TypeCheckResult = { +// this check currently does not allow valueContainsNull to vary, +// and unfortunately none of the MapType toString methods include +// valueContainsNull for the error message +if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + override def dataType: MapType = { +children.headOption.map(_.dataType.asInstanceOf[MapType]) + .getOrElse(MapType(keyType = StringType, valueType = StringType)) + } + + override def nullable: Boolean = false --- End diff -- Hm, seems a bit unusual to me to have, in effect, `NULL ++ NULL => Map()`. I checked with Presto and it looks like it returns `NULL`: presto> select map_concat(NULL, NULL) -> ; _col0 --- NULL (1 row) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21070 @scottcarey I agree that's important. Perhaps it could be done as a follow-up PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL][WIP] Implement map_concat
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r181915827 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -115,6 +116,62 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """) +case class MapConcat(children: Seq[Expression]) extends Expression + with CodegenFallback { + + override def checkInputDataTypes(): TypeCheckResult = { +// this check currently does not allow valueContainsNull to vary, +// and unfortunately none of the MapType toString methods include +// valueContainsNull for the error message +if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + override def dataType: MapType = { +children.headOption.map(_.dataType.asInstanceOf[MapType]) + .getOrElse(MapType(keyType = StringType, valueType = StringType)) + } + + override def nullable: Boolean = false --- End diff -- What's the result of `map_concat(NULL, NULL)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181908529 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.{Clock, SystemClock, Utils} + +private[spark] class YarnAllocatorBlacklistTracker( +sparkConf: SparkConf, +amClient: AMRMClient[ContainerRequest], +failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker) + extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + private val BLACKLIST_TIMEOUT_MILLIS = + sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT)) + + private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED = +sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false) + + private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = sparkConf.get(MAX_FAILED_EXEC_PER_NODE) + + private val BLACKLIST_SIZE_LIMIT = sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT) + + private val BLACKLIST_SIZE_DEFAULT_WEIGHT = sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT) + + private var clock: Clock = new SystemClock + + private val allocationBlacklistedNodesWithExpiry = new HashMap[String, Long]() + + private var currentBlacklistedYarnNodes = Set.empty[String] + + private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long] --- End diff -- Do you need to keep a separate data structure for the scheduler and allocator blacklisted nodes? Instead, could you add the scheduler ones into a shared map when `setSchedulerBlacklistedNodes` is called? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181911744 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, SystemClock} + +private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { + + private var clock: Clock = new SystemClock + + private val executorFailuresValidityInterval = + sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) + + // Queue to store the timestamp of failed executors for each host + private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() + + private val failedExecutorsTimeStamps = new mutable.Queue[Long]() + + private def getNumFailuresWithinValidityInterval( + failedExecutorsTimeStampsForHost: mutable.Queue[Long], + endTime: Long): Int = { +while (executorFailuresValidityInterval > 0 + && failedExecutorsTimeStampsForHost.nonEmpty + && failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) { --- End diff -- This relies on the fact the `clock` is monotonic, but if it's a `SystemClock` it's based on `System.currentTimeMillis()` which is not monotonic and can time-travel. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181910914 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, SystemClock} + +private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { + + private var clock: Clock = new SystemClock + + private val executorFailuresValidityInterval = + sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) + + // Queue to store the timestamp of failed executors for each host + private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() + + private val failedExecutorsTimeStamps = new mutable.Queue[Long]() + + private def getNumFailuresWithinValidityInterval( + failedExecutorsTimeStampsForHost: mutable.Queue[Long], + endTime: Long): Int = { +while (executorFailuresValidityInterval > 0 + && failedExecutorsTimeStampsForHost.nonEmpty + && failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) { + failedExecutorsTimeStampsForHost.dequeue() --- End diff -- It's counter-intuitive that this `get*` method mutates state. If I called getNumFailuresWithinValidityInterval(foo, 0) getNumFailuresWithinValidityInterval(foo, 10) getNumFailuresWithinValidityInterval(foo, 0) The last call can return something different from the first because all the failures that weren't within `10 - executorFailuresValidityInterval` will have been dropped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181907316 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.{Clock, SystemClock, Utils} + +private[spark] class YarnAllocatorBlacklistTracker( +sparkConf: SparkConf, +amClient: AMRMClient[ContainerRequest], +failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker) + extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + private val BLACKLIST_TIMEOUT_MILLIS = + sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT)) + + private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED = +sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false) + + private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = sparkConf.get(MAX_FAILED_EXEC_PER_NODE) + + private val BLACKLIST_SIZE_LIMIT = sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT) + + private val BLACKLIST_SIZE_DEFAULT_WEIGHT = sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT) + + private var clock: Clock = new SystemClock + + private val allocationBlacklistedNodesWithExpiry = new HashMap[String, Long]() + + private var currentBlacklistedYarnNodes = Set.empty[String] + + private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long] + + private var numClusterNodes = (Int.MaxValue / BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt + + def setNumClusterNodes(numClusterNodes: Int): Unit = { +this.numClusterNodes = numClusterNodes + } + + /** + * Use a different clock. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { +clock = newClock + } + + def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = { +hostOpt match { + case Some(hostname) => +// failures on a already blacklisted nodes are not even tracked +// otherwise such failures could shutdown the application +// as resource requests are asynchronous +// and a late failure response could exceed MAX_EXECUTOR_FAILURES +if (!schedulerBlacklistedNodesWithExpiry.contains(hostname) && + !allocationBlacklistedNodesWithExpiry.contains(hostname)) { + failureWithinTimeIntervalTracker.registerFailureOnHost(hostname) + updateAllocationBlacklistedNodes(hostname) +} + case None => +failureWithinTimeIntervalTracker.registerExecutorFailure() +} + } + + private def updateAllocationBlacklistedNodes(hostname: String): Unit = { +if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) { --- End diff -- consider just: if (!IS_YARN_ALLOCATION_BLACKLIST_ENABLED) return; to save a level of indentation below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181912300 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, SystemClock} + +private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { + + private var clock: Clock = new SystemClock + + private val executorFailuresValidityInterval = + sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) + + // Queue to store the timestamp of failed executors for each host + private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() + + private val failedExecutorsTimeStamps = new mutable.Queue[Long]() + + private def getNumFailuresWithinValidityInterval( --- End diff -- It's not really clear what a 'validity interval' is. I think it means that only failures that have happened recently are considered valid? I think it would be clearer to call this `getNumFailuresSince()`, or `getRecentFailureCount()` or similar, and explicitly pass in the timestamp the caller wants to consider failures since. If you do the latter, and drop the `endTime` argument, then you partly address the issue I raise below about how this mutates state, because `getRecentFailureCount()` suggests more clearly that it's expecting to take into account the current time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181907395 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.{Clock, SystemClock, Utils} + +private[spark] class YarnAllocatorBlacklistTracker( +sparkConf: SparkConf, +amClient: AMRMClient[ContainerRequest], +failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker) + extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + private val BLACKLIST_TIMEOUT_MILLIS = + sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT)) + + private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED = +sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false) + + private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = sparkConf.get(MAX_FAILED_EXEC_PER_NODE) + + private val BLACKLIST_SIZE_LIMIT = sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT) + + private val BLACKLIST_SIZE_DEFAULT_WEIGHT = sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT) + + private var clock: Clock = new SystemClock + + private val allocationBlacklistedNodesWithExpiry = new HashMap[String, Long]() + + private var currentBlacklistedYarnNodes = Set.empty[String] + + private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long] + + private var numClusterNodes = (Int.MaxValue / BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt + + def setNumClusterNodes(numClusterNodes: Int): Unit = { +this.numClusterNodes = numClusterNodes + } + + /** + * Use a different clock. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { +clock = newClock + } + + def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = { +hostOpt match { + case Some(hostname) => +// failures on a already blacklisted nodes are not even tracked +// otherwise such failures could shutdown the application +// as resource requests are asynchronous +// and a late failure response could exceed MAX_EXECUTOR_FAILURES +if (!schedulerBlacklistedNodesWithExpiry.contains(hostname) && + !allocationBlacklistedNodesWithExpiry.contains(hostname)) { + failureWithinTimeIntervalTracker.registerFailureOnHost(hostname) + updateAllocationBlacklistedNodes(hostname) +} + case None => +failureWithinTimeIntervalTracker.registerExecutorFailure() +} + } + + private def updateAllocationBlacklistedNodes(hostname: String): Unit = { +if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) { + val failuresOnHost = failureWithinTimeIntervalTracker.getNumExecutorFailuresOnHost(hostname) + if (failuresOnHost > BLACKLIST_MAX_FAILED_EXEC_PER_NODE) { +logInfo("blacklisting host as YARN allocation failed: %s".format(hostname)) --- End diff -- log msg could include the number of failures --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181901031 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { --- End diff -- Is this used anywhere other than line 154? If not, can be inlined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181902045 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { +try { + return (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); --- End diff -- could you preserve the comment about "Bytes are stored as 4-byte little endian int. Just read the first byte."? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181889287 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- It seems worth it to me, to be defensive against performance changes - but feel free to punt it to me as a follow-on patch if you'd rather. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181882476 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Isn't that what `hasArray()` is for though? If the buffers are backed by a byte array, `hasArray()` returns true and accessing the byte array via `array()` should be 0 cost. (If `array()` actually copies any data, that would invalidate this line of reasoning but would also be unexpected). So for example, here you'd have: public final void readIntegers(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 4; ByteBuffer buffer = getBuffer(requiredBytes); if (buffer.hasArray()) { c.putIntsLittleEndian(rowId, total, buffer.array(), 0); } else { for (int i = 0; i < total; i += 1) { c.putInt(rowId + i, buffer.getInt()); } } } This seems to be the same pattern that's in `readBinary()`, below. Let me know if I'm missing something! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21072#discussion_r181847445 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala --- @@ -98,4 +98,31 @@ class RemoveRedundantSortsSuite extends PlanTest { val correctAnswer = groupedAndResorted.analyze comparePlans(optimized, correctAnswer) } + + test("remove two consecutive sorts") { +val orderedTwice = testRelation.orderBy('a.asc).orderBy('b.desc) +val optimized = Optimize.execute(orderedTwice.analyze) +val correctAnswer = testRelation.orderBy('b.desc).analyze +comparePlans(optimized, correctAnswer) + } --- End diff -- Can you add a test for three consecutive sorts? Two is the base case, three will help us show the inductive case :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21072#discussion_r181851593 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -736,12 +736,22 @@ object EliminateSorts extends Rule[LogicalPlan] { } /** - * Removes Sort operation if the child is already sorted + * Removes redundant Sort operation. This can happen: + * 1) if the child is already sorted + * 2) if the there is another Sort operator separated by 0...n Project/Filter operators --- End diff -- nit: 'the there' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21072#discussion_r181849951 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala --- @@ -98,4 +98,31 @@ class RemoveRedundantSortsSuite extends PlanTest { val correctAnswer = groupedAndResorted.analyze comparePlans(optimized, correctAnswer) } + --- End diff -- Could you add a test which explicitly confirms that sort.limit.sort is not simplified? I know the above two tests cover that case, but it's good to have one dedicated to testing this important property. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181846514 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Agreed that fixing the `ByteBuffer` / `ColumnVector` interaction should be dealt with elsewhere. I'm just raising the possibility of _regressing_ the read path here because the copies are less efficient. Since it's going to be a while before 2.4.0, that might be ok if we commit to fixing it - but it superficially seems like a manageable change to the PR since the code to call the bulk APIs is already there. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21049#discussion_r181839715 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -307,6 +309,32 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { } } +/** + * Remove [[Sort]] in subqueries that do not affect the set of rows produced, only their + * order. Subqueries produce unordered sets of rows so sorting their output is unnecessary. + */ +object RemoveSubquerySorts extends Rule[LogicalPlan] { + + /** + * Removes all [[Sort]] operators from a plan that are accessible from the root operator via + * 0 or more [[Project]], [[Filter]] or [[View]] operators. + */ + private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = { +plan match { + case Sort(_, _, child) => removeTopLevelSorts(child) + case Project(fields, child) => Project(fields, removeTopLevelSorts(child)) + case Filter(condition, child) => Filter(condition, removeTopLevelSorts(child)) + case View(tbl, output, child) => View(tbl, output, removeTopLevelSorts(child)) + case _ => plan +} + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Subquery(child) => Subquery(removeTopLevelSorts(child)) +case SubqueryAlias(name, child) => SubqueryAlias(name, removeTopLevelSorts(child)) --- End diff -- Yep, that's why I added the new rule just before `EliminateSubqueryAliases` (which runs in the optimizer, as part of the 'finish analysis' batch). After `EliminateSubqueryAliases` there doesn't seem to be any way to detect subqueries. Another approach I suppose would be to handle this like `SparkPlan`'s `requiredChildOrdering` - if a parent doesn't require any ordering of the child, (and the child is a `Sort` node), the child `Sort` should be dropped. That seems like a more fundamental change though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21049 In SQL, the sort in a subquery doesn't make sense because of the relational model - the output of a subquery is an unordered bag of tuples. Some engines still allow the sort, some silently drop it and some throw an error. For example: * MariaDB: https://mariadb.com/kb/en/library/why-is-order-by-in-a-from-subquery-ignored/ * SQL Server: https://stackoverflow.com/questions/985921/sql-error-with-order-by-in-subquery Oracle and Postgres allow the `ORDER BY`. One issue might be that the underlying dataframe model might not be 100% relational - maybe dataframes _are_ sorted lists of rows and then this optimization would only be valid if using the SQL interface. If so, it's probably not worth the effort to maintain. But if dataframes and SQL relations are supposed to be equivalent, we can drop the `ORDER BY`. We also may want to decide not to do this because it would surprise users who had been relying on the existing behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21072#discussion_r181563918 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -736,12 +736,15 @@ object EliminateSorts extends Rule[LogicalPlan] { } /** - * Removes Sort operation if the child is already sorted + * Removes redundant Sort operation. This can happen: + * 1) if the child is already sorted + * 2) if the next operator is a Sort itself */ object RemoveRedundantSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => child +case s @ Sort(_, _, Sort(_, _, child)) => s.copy(child = child) --- End diff -- Thanks for doing this! It might be useful to generalise this to any pair of sorts separated by 0 or more projections or filters. I did this for my SPARK-23975 PR, see: https://github.com/henryr/spark/commit/bb992c2058863322a9183b2985806a87729e4168#diff-a636a87d8843eeccca90140be91d4fafR322 What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181540952 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Couldn't it also be writing to an `OffHeapColumnVector`? https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199 If so, I think the copy is 1MB at a time: https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189 I agree that ByteBuffer shouldn't be supported in this PR. But there's an opportunity to use the bulk copy APIs which would benefit from any future optimization that happens. Plus even if the copy does eventually become a loop inside the column vector implementation, there's more chance of the JIT unrolling the loop since it's smaller. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181528729 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Here and elsewhere a bulk copy has been replaced by many smaller copies. It would be better to be able to use the bulk version. I think it would be preferable to at least have: if (buffer.hasArray()) { c.putIntsLittleEndian(rowId, total, buffer.array(), 0); } else { for (int i = 0 // ... etc } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r181253369 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child + } --- End diff -- Filed SPARK-23973 for this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21049#discussion_r180964957 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -307,6 +309,32 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { } } +/** + * Remove [[Sort]] in subqueries that do not affect the set of rows produced, only their + * order. Subqueries produce unordered sets of rows so sorting their output is unnecessary. + */ +object RemoveSubquerySorts extends Rule[LogicalPlan] { + + /** + * Removes all [[Sort]] operators from a plan that are accessible from the root operator via + * 0 or more [[Project]], [[Filter]] or [[View]] operators. + */ + private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = { +plan match { + case Sort(_, _, child) => removeTopLevelSorts(child) + case Project(fields, child) => Project(fields, removeTopLevelSorts(child)) + case Filter(condition, child) => Filter(condition, removeTopLevelSorts(child)) + case View(tbl, output, child) => View(tbl, output, removeTopLevelSorts(child)) + case _ => plan +} + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Subquery(child) => Subquery(removeTopLevelSorts(child)) +case SubqueryAlias(name, child) => SubqueryAlias(name, removeTopLevelSorts(child)) --- End diff -- Thanks! I've been trying to understand the role of `Subquery` and `SubqueryAlias`. My confusion is that subqueries do seem to get planned as `SubqueryAlias` operators, e.g.: scala> spark.sql("SELECT count(*) from (SELECT id FROM dft ORDER BY id)").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias('count(1), None)] +- 'SubqueryAlias __auto_generated_subquery_name +- 'Sort ['id ASC NULLS FIRST], true +- 'Project ['id] +- 'UnresolvedRelation `dft` In the example you give I (personally) think it's still reasonable to drop the ordering, but understand that might surprise some users. It wouldn't be hard to skip the root if it's a subquery - but what do you propose for detecting subqueries if my method isn't right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/21049 [SPARK-23957][SQL] Remove redundant sort operators from subqueries ## What changes were proposed in this pull request? Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering them is therefore redundant (unless combined with a limit). This patch adds a new optimizer rule that removes sort operators that are directly below subqueries (or some combination of projection and filtering below a subquery). ## How was this patch tested? New unit tests. All sql unit tests pass. You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-23957 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21049.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21049 commit bb992c2058863322a9183b2985806a87729e4168 Author: Henry Robinson <henry@...> Date: 2018-04-12T03:44:36Z [SPARK-23957][SQL] Remove redundant sort operators from subqueries ## What changes were proposed in this pull request? Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering them is therefore redundant (unless combined with a limit). This patch adds a new optimizer rule that removes sort operators that are directly below subqueries (or some combination of projection and filtering below a subquery). ## How was this patch tested? New unit tests. All sql unit tests pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179003707 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan { override final def children: Seq[LogicalPlan] = Seq(left, right) } + +abstract class KeepOrderUnaryNode extends UnaryNode { --- End diff -- `OrderPreservingUnaryNode`? Or perhaps do you think this would be better modeled as a mixin trait? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180601594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child + } --- End diff -- You might not want to do it in this PR, but you could easily remove another simple kind of redundant sort, e.g.: `rel.orderBy('a.desc).orderBy('a.asc)` (and I think that `orderBy` is not stable, so any two consecutive `orderBy` operators are redundant). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180592716 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, + ShuffleExchangeExec} --- End diff -- revert this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20687 @gatorsmile ok, I think the coverage right now is a reasonable start - the other test cases I can think of would act more like they're exercising the expression-walking code, not the actual simplification. Look forward to collaborating on the follow-up PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20687 @gatorsmile thank you for the reviews! Are there specific test cases you'd like to see? I've checked correlated and uncorrelated subqueries, various flavours of join, aggregates with HAVING clauses, nested compound types, and so on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r174637789 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -22,54 +22,34 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule /** -* push down operations into [[CreateNamedStructLike]]. -*/ -object SimplifyCreateStructOps extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformExpressionsUp { - // push down field extraction + * Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. + */ +object SimplifyExtractValueOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p => +p.transformExpressionsUp { --- End diff -- FWIW I think this is a particular example of a more general problem where expression simplification can break the correspondence between a select expression and its grouping equivalent. Here's a simpler example: `SELECT (a + b) - a FROM t GROUP BY a + b` gets me the following: `org.apache.spark.sql.AnalysisException: expression 't.`b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.` Postgres also has this problem, at least in 9.3: `ERROR: column "t.a" must appear in the GROUP BY clause or be used in an aggregate function Position: 23` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173561557 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -22,32 +22,24 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule /** -* push down operations into [[CreateNamedStructLike]]. +* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. */ -object SimplifyCreateStructOps extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformExpressionsUp { - // push down field extraction +object SimplifyExtractValueOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p => +p.transformExpressionsUp { + // Remove redundant field extraction. case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => createNamedStructLike.valExprs(ordinal) -} - } -} -/** -* push down operations into [[CreateArray]]. -*/ -object SimplifyCreateArrayOps extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformExpressionsUp { - // push down field selection (array of structs) + // Remove redundant array indexing. case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => // instead f selecting the field on the entire array, // select it from each member of the array. // pushing down the operation this way open other optimizations opportunities // (i.e. struct(...,x,...).x) CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name - // push down item selection. + + // Remove redundant map lookup. case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => // instead of creating the array and then selecting one row, // remove array creation altgether. --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173561516 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -22,32 +22,24 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule /** -* push down operations into [[CreateNamedStructLike]]. +* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. */ -object SimplifyCreateStructOps extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformExpressionsUp { - // push down field extraction +object SimplifyExtractValueOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p => +p.transformExpressionsUp { + // Remove redundant field extraction. case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => createNamedStructLike.valExprs(ordinal) -} - } -} -/** -* push down operations into [[CreateArray]]. -*/ -object SimplifyCreateArrayOps extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformExpressionsUp { - // push down field selection (array of structs) + // Remove redundant array indexing. case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173561415 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -22,32 +22,24 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule /** -* push down operations into [[CreateNamedStructLike]]. +* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. */ --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173323846 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .analyze comparePlans(Optimizer execute rel, expected) } + + test("SPARK-23500: Simplify complex ops that aren't at the plan root") { +val structRel = relation + .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 0, None) as "foo") + .groupBy($"foo")("1").analyze +val structExpected = relation + .select('nullable_id as "foo") + .groupBy($"foo")("1").analyze +comparePlans(Optimizer execute structRel, structExpected) + +// If nullable attributes aren't used in the 'expected' plans, the array and map test +// cases fail because array and map indexing can return null so the output attribute --- End diff -- Done, thanks. I filed SPARK-23634 to fix this. Out of interest, why does `AttributeReference` cache the nullability of its referent? Is it because comparison is too expensive to do if you have to follow a level of indirection to get to the original attribute? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173268999 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .analyze comparePlans(Optimizer execute rel, expected) } + + test("SPARK-23500: Simplify complex ops that aren't at the plan root") { +val structRel = relation + .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 0, None) as "foo") + .groupBy($"foo")("1").analyze +val structExpected = relation + .select('nullable_id as "foo") + .groupBy($"foo")("1").analyze +comparePlans(Optimizer execute structRel, structExpected) + +// If nullable attributes aren't used in the 'expected' plans, the array and map test +// cases fail because array and map indexing can return null so the output attribute --- End diff -- @cloud-fan I looked again at this briefly this morning. The issue is that it's the `AttributeReference` in the top-level `Aggregate`'s `groupingExpressions` that has inconsistent nullability. The `AttributeReference` in the original plan was originally created with `nullable=true`, before optimization. So at that point it's kind of fixed unless the optimizer dereferences the attr reference and realises that the target is no longer nullable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173013094 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .analyze comparePlans(Optimizer execute rel, expected) } + + test("SPARK-23500: Simplify complex ops that aren't at the plan root") { +val structRel = relation + .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 0, None) as "foo") + .groupBy($"foo")("1").analyze +val structExpected = relation + .select('nullable_id as "foo") + .groupBy($"foo")("1").analyze +comparePlans(Optimizer execute structRel, structExpected) + +// If nullable attributes aren't used in the 'expected' plans, the array and map test +// cases fail because array and map indexing can return null so the output attribute --- End diff -- Thanks, that's plenty of information to get started - I'll dig into it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r173007679 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .analyze comparePlans(Optimizer execute rel, expected) } + + test("SPARK-23500: Simplify complex ops that aren't at the plan root") { +val structRel = relation + .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 0, None) as "foo") + .groupBy($"foo")("1").analyze +val structExpected = relation + .select('nullable_id as "foo") + .groupBy($"foo")("1").analyze +comparePlans(Optimizer execute structRel, structExpected) + +// If nullable attributes aren't used in the 'expected' plans, the array and map test +// cases fail because array and map indexing can return null so the output attribute --- End diff -- It's a good question! I'm not too familiar with how nullability is marked and unmarked during planning. My understanding is roughly that the analyzer resolves all the plan's expressions and in doing so marks attributes as nullable or not. After that it's not clear that the optimizer revisits any of those nullability decisions. Is there an optimizer pass which should make nullability marking more precise? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r172996211 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -22,32 +22,24 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule /** -* push down operations into [[CreateNamedStructLike]]. +* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. */ object SimplifyCreateStructOps extends Rule[LogicalPlan] { --- End diff -- Good point, done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r172992262 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -331,4 +330,30 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .analyze comparePlans(Optimizer execute rel, expected) } + + test("SPARK-23500: Simplify complex ops that aren't at the plan root") { +// If nullable attributes aren't used, the array and map test cases fail because array +// and map indexing can return null so the output is marked nullable. --- End diff -- The optimization works either way, but in (for example) the map case, `m1` is marked as nullable in the original plan because presumably `GetMapValue(CreateMap(...))` can return `null` if the key is not in the map. So for the expected plan to compare the same as the original, it has to be reading a nullable attribute - otherwise the plans don't pass `comparePlans`. I moved and reworded the comment to hopefully clarify this a bit. There's an opportunity to fix this up again after the rule completes (since some attributes could be marked too conservatively as nullable). Do you think that's something we should pursue for this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20687 This failing because of SPARK-23606, which seems unrelated (I haven't been able to trigger it in local builds, at least). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20687 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statist...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20740 argh, wrong PR, sorry for retest-spam. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statist...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20740 Retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/20740 [SPARK-23604][SQL] Change Statistics.isEmpty to !Statistics.hasNonNul⦠â¦lValue ## What changes were proposed in this pull request? Parquet 1.9 will change the semantics of Statistics.isEmpty slightly to reflect if the null value count has been set. That breaks a timestamp interoperability test that cares only about whether there are column values present in the statistics of a written file for an INT96 column. Fix by using Statistics.hasNonNullValue instead. ## How was this patch tested? Unit tests continue to pass against Parquet 1.8, and also pass against a Parquet build including PARQUET-1217. You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-23604 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20740.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20740 commit b86c0bee16f85ad1f7d4e3351647ee5d0582d005 Author: Henry Robinson <henry@...> Date: 2018-03-05T20:59:53Z [SPARK-23604][SQL] Change Statistics.isEmpty to !Statistics.hasNonNullValue ## What changes were proposed in this pull request? Parquet 1.9 will change the semantics of Statistics.isEmpty slightly to reflect if the null value count has been set. That breaks a timestamp interoperability test that cares only about whether there are column values present in the statistics of a written file for an INT96 column. Fix by using Statistics.hasNonNullValue instead. ## How was this patch tested? Unit tests continue to pass against Parquet 1.8, and also pass against a Parquet build including PARQUET-1217. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-25000][SQL] Fix complex type simplificatio...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20687#discussion_r172300096 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -331,4 +331,24 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .analyze comparePlans(Optimizer execute rel, expected) } + + test("SPARK-23500: Simplify complex ops that aren't at the plan root") { +val structRel = relation + .select(GetStructField(CreateNamedStruct(Seq("att1", 'id)), 0, None) as "foo") + .select('foo).analyze --- End diff -- Thanks for the pointer. I replaced the projection with an aggregation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-25000][SQL] Fix complex type simplification rules...
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20687 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20687: [SPARK-25000][SQL] Fix complex type simplificatio...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/20687 [SPARK-25000][SQL] Fix complex type simplification rules to apply to entire plan ## What changes were proposed in this pull request? Complex type simplification optimizer rules were not applied to the entire plan, just the expressions reachable from the root node. This patch fixes the rules to transform the entire plan. ## How was this patch tested? New unit test + ran sql / core tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-25000 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20687.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20687 commit f446fa2482f2ccdfa6fec7243bb25b2e232da5fa Author: Henry Robinson <henry@...> Date: 2018-02-28T00:42:17Z [SPARK-25000][SQL] Fix complex type simplification rules to apply to entire plan ## What changes were proposed in this pull request? Complex type simplification optimizer rules were not applied to the entire plan, just the expressions reachable from the root node. This patch fixes the rules to transform the entire plan. ## How was this patch tested? New unit test + sql / core tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20443: [SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkD...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/20443 [SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFrame in R comment You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark SPARK-23157 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20443 commit 2441e030a5d20ceb6b1c0068e0adf5586c254bb8 Author: Henry Robinson <henry@...> Date: 2018-01-30T22:49:36Z [SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFrame in R comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20429: [SPARK-23157][SQL] Explain restriction on column ...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/20429 [SPARK-23157][SQL] Explain restriction on column expression in withColumn() ## What changes were proposed in this pull request? It's not obvious from the comments that any added column must be a function of the dataset that we are adding it to. Add a comment to that effect to Scala, Python and R Data* methods. You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark SPARK-23157 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20429.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20429 commit 18f30863a67e1ad49be87765badab69647e45d7a Author: Henry Robinson <henry@...> Date: 2018-01-29T21:51:00Z [SPARK-23157][SQL] Explain restriction on column expression in withColumn() It's not obvious from the comments that any added column must be a function of the dataset that we are adding it to. Add a comment to that effect to Scala, Python and R Data* methods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20355#discussion_r163455727 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala --- @@ -78,4 +78,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext { } } } + + // Separate test case for text-based formats that support multiLine as an option. + Seq("json", "csv", "text").foreach { format => --- End diff -- That sounds good - my main concern is to make sure that both `multiLine=true` _and_ `multiLine=false` have coverage with a space in the name, since they are such different paths. I'll keep the change that adds a space to `nameWithSpecialChars`, but otherwise have the tests as you suggest - let me know what you think of the next patch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20355#discussion_r163422934 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala --- @@ -68,13 +68,16 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext { } allFileBasedDataSources.foreach { format => -test(s"SPARK-22146 read files containing special characters using $format") { - val nameWithSpecialChars = s"sp%chars" - withTempDir { dir => -val tmpFile = s"$dir/$nameWithSpecialChars" -spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile) -val fileContent = spark.read.format(format).load(tmpFile) -checkAnswer(fileContent, Seq(Row("a"), Row("b"))) +test(s"SPARK-22146 / SPARK-23148 read files containing special characters using $format") { + val nameWithSpecialChars = s"sp%c hars" + Seq(true, false).foreach { multiline => --- End diff -- Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20355#discussion_r163411267 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala --- @@ -172,6 +172,14 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-23148: test for spaces in file names") { --- End diff -- In the end, to reduce code duplication, I made it so that orc and parquet run multiline as well (I tried to find a neat way to only run multiline if the format was csv, text or json without having a separate test case but it just complicated things). Let me know if you'd rather I have two separate test cases to avoid running the two redundant cases with orc / parquet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20355#discussion_r163146077 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala --- @@ -172,6 +172,14 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-23148: test for spaces in file names") { --- End diff -- @HyukjinKwon good idea, thanks for pointing out that test - how about I just add a space to the special characters string, and have the existing test also test multiline on/off for text, csv and json? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...
GitHub user henryr opened a pull request: https://github.com/apache/spark/pull/20355 SPARK-23148: [SQL] Allow pathnames with special characters for CSV / ⦠â¦JSON / text ## What changes were proposed in this pull request? Fix for JSON and CSV data sources when file names include characters that would be changed by URL encoding. ## How was this patch tested? New unit tests for JSON, CSV and text suites You can merge this pull request into a Git repository by running: $ git pull https://github.com/henryr/spark spark-23148 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20355.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20355 commit 98da4cd4b865ef8944a10a24ee709ed9ee0a4721 Author: Henry Robinson <henry@...> Date: 2018-01-19T22:55:53Z SPARK-23148: [SQL] Allow pathnames with special characters for CSV / JSON / text ## What changes were proposed in this pull request? Fix for JSON and CSV data sources when file names include characters that would be changed by URL encoding. ## How was this patch tested? New unit tests for JSON, CSV and text suites --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20254: [SPARK-23062][SQL] Improve EXCEPT documentation
Github user henryr commented on the issue: https://github.com/apache/spark/pull/20254 Thanks all for the pointers and feedback! I've removed the references to the behavior before 2.0, and now the changes just make it explicit that this is `EXCEPT DISTINCT` (I appreciate that that's the meaning of `EXCEPT` per ANSI, but the behavior change since 1.x has confused users I've spoken to so seems worthwhile to make the documentation as clear as possible). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org