[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @maropu, I don't think we can. Actually this is how we deal with [simpler joins](https://github.com/apache/spark/pull/22318#issuecomment-427080091) Do you think changing the behaviour is unacceptable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22617: [SPARK-25484][SQL][TEST] Refactor ExternalAppendO...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22617#discussion_r229480460 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala --- @@ -68,9 +100,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => var sum = 0L for (_ <- 0L until iterations) { -val array = new ExternalAppendOnlyUnsafeRowArray( - ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer, - numSpillThreshold) +val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold) --- End diff -- Ok. Reverted the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22789 Thanks @mgaido91 @hvanhovell for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22617: [SPARK-25484][SQL][TEST] Refactor ExternalAppendO...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22617#discussion_r228944791 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala --- @@ -68,9 +100,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => var sum = 0L for (_ <- 0L until iterations) { -val array = new ExternalAppendOnlyUnsafeRowArray( - ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer, - numSpillThreshold) +val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold) --- End diff -- Actually, following my logic in the description, I think ``` val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold) ``` should be changed to ``` val array = new ExternalAppendOnlyUnsafeRowArray(0, numSpillThreshold) ``` in `testAgainstRawUnsafeExternalSorter` in "WITH SPILL" cases so as to compare `ExternalAppendOnlyUnsafeRowArray` to `UnsafeExternalSorter` when it behaves so. But would be great if someone could confirm this idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22617: [SPARK-25484][SQL][TEST] Refactor ExternalAppendO...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22617#discussion_r228889856 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala --- @@ -68,9 +100,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => var sum = 0L for (_ <- 0L until iterations) { -val array = new ExternalAppendOnlyUnsafeRowArray( - ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer, - numSpillThreshold) +val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold) --- End diff -- Thanks for the feedback @dongjoon-hyun. I added some details to the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] Fix attribute resolution in nested ex...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22817 Thanks @dongjoon-hyun , @gatorsmile, @cloud-fan , @hvanhovell for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22789#discussion_r228753985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -146,7 +146,10 @@ trait CodegenSupport extends SparkPlan { if (outputVars != null) { assert(outputVars.length == output.length) // outputVars will be used to generate the code for UnsafeRow, so we should copy them -outputVars.map(_.copy()) +outputVars.map(_.copy()) match { --- End diff -- What I found is that in `prepareRowVar` the `evaluateVariables` call clears the code of `outputVars`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22789#discussion_r228753682 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(df.limit(1).collect() === Array(Row("bat", 8.0))) } } + + test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") { --- End diff -- Strange, I've just rerun it on master and got: ``` 08:34:45.754 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 81, Column 17: Expression "bhj_isNull_6" is not an rvalue org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 81, Column 17: Expression "bhj_isNull_6" is not an rvalue ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22617: [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsa...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22617 @dongjoon-hyun , @kiszk could you please help me how take a step forward with this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22789 @kiszk , @mgaido91, @hvanhovell anything I can add to this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22817: [SPARK-25816][SQL] Fix attribute resolution in ne...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22817#discussion_r228737835 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25816 ResolveReferences works with nested extractors") { +val df0 = Seq((1, Map(1 -> "a")), (2, Map(2 -> "b"))).toDF("1", "2") +val df1 = df0.select($"1".as("2"), $"2".as("1")) +val df2 = df1.filter($"1"(map_keys($"1")(0)) > "a") --- End diff -- @cloud-fan unfortunately, that would be a bit too much of simplification. In your example `ResolveMissingReferences` after `ResolveReferences` is able to resolve the `extraction` correctly even without this PR. The issue comes up when we have another level of projection before the extractor. I simplified the example a bit though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] Fix attribute resolution in nested ex...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22817 @hvanhovell @gatorsmile I think this is regression from 2.2 to 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22804: [SPARK-25665][SQL][TEST] Refactor ObjectHashAggregateExe...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22804 Thanks @dongjoon-hyun , @wangyum for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] Fix attribute resolution in nested ex...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22817 @gatorsmile , I looked into this and it seems if we use `mapChildren` in `ResolveReferences` then `UnresolvedExtractValue` should define 2 children. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22817: [SPARK-25816][SQL] Fix attribute resolution in ne...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22817#discussion_r228285647 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -407,7 +407,10 @@ case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star with Une * can be key of Map, index of Array, field name of Struct. */ case class UnresolvedExtractValue(child: Expression, extraction: Expression) - extends UnaryExpression with Unevaluable { + extends BinaryExpression with Unevaluable { --- End diff -- if this change is not allowed then `children` we can override `children` instead --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22817 I will try to investigate a bit more come up with an other solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22817 So based on the UT results it seems that simply changing the resolution to bottom-up causes issues with `LambdaFunction`s in the current version of Spark. The issue seems to be a regression, and caused by this commit: https://github.com/apache/spark/commit/36b826f5d17ae7be89135cb2c43ff797f9e7fe48 But reverting to bottom-up isn't simply viable in `ResolveReferences` since `LambdaFunction`s were introduced. The issue is that `ResolveReferences` have to be top-down and stop traversing the expression tree on an unbound `LambdaFunction` as a parameter attribute of a function can collide with an attribute from a DF. Such a colliding parameter attribute should not be resolved in `ResolveReferences` but in `ResolveLambdaVariables`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22804: [SPARK-25665][SQL][TEST] Refactor ObjectHashAggregateExe...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22804 Thanks @dongjoon-hyun for the fixes. Merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22817 Thanks @gatorsmile , I thought the issue in SPARK-25816 and in the added UT is because the top-down. I thought that `UnresolvedExtractValue(child, fieldExpr) if child.resolved` could be resolved if we give a try to resolve child first so switched to bottom-up. But I see your point that maybe top-down is required in other cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22817: [SPARK-25816][SQL] ResolveReferences should work ...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22817 [SPARK-25816][SQL] ResolveReferences should work bottom-up manner on expressions ## What changes were proposed in this pull request? ResolveReferences works top-down manner when resolving the expression tree, this PR changes the resolution order to bottom-up and so solves some issues. This PR also replaces the `resolve` method to the more general `transformExpressionsUp`. ## How was this patch tested? added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25816 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22817.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22817 commit f62725f5378f365c58b4b01f2a078e5e93176cda Author: Peter Toth Date: 2018-10-24T18:46:07Z [SPARK-25816][SQL] ResolveReferences works bottom-up on expressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22804: [SPARK-25665][SQL][TEST] Refactor ObjectHashAggre...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22804#discussion_r227470048 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala --- @@ -21,207 +21,212 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile -import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.hive.execution.TestingTypedCount -import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with TestHiveSingleton { - ignore("Hive UDAF vs Spark AF") { -val N = 2 << 15 - -val benchmark = new Benchmark( - name = "hive udaf vs spark af", - valuesPerIteration = N, - minNumIters = 5, - warmupTime = 5.seconds, - minTime = 10.seconds, - outputPerIteration = true -) - -registerHiveFunction("hive_percentile_approx", classOf[GenericUDAFPercentileApprox]) - -sparkSession.range(N).createOrReplaceTempView("t") - -benchmark.addCase("hive udaf w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - sparkSession.sql("SELECT hive_percentile_approx(id, 0.5) FROM t").collect() -} - -benchmark.addCase("spark af w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.sql("SELECT percentile_approx(id, 0.5) FROM t").collect() -} - -benchmark.addCase("hive udaf w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - sparkSession.sql( -s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() -} - -benchmark.addCase("spark af w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.sql( -s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() -} - -benchmark.addCase("spark af w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - sparkSession.sql( -s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() -} - -benchmark.run() - -/* -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 -Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - -hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - -hive udaf w/o group by5326 / 5408 0.0 81264.2 1.0X -spark af w/o group by 93 / 111 0.7 1415.6 57.4X -hive udaf w/ group by 3804 / 3946 0.0 58050.1 1.4X -spark af w/ group by w/o fallback 71 / 90 0.9 1085.7 74.8X -spark af w/ group by w/ fallback98 / 111 0.7 1501.6 54.1X - */ - } - - ignore("ObjectHashAggregateExec vs SortAggregateExec - typed_count") { -val N: Long = 1024 * 1024 * 100 - -val benchmark = new Benchmark( - name = "object agg v.s. sort agg", - valuesPerIteration = N, - minNumIters = 1, - warmupTime = 10.seconds, - minTime = 45.seconds, - outputPerIteration = true -) - -import
[GitHub] spark pull request #22804: [SPARK-25665][SQL][TEST] Refactor ObjectHashAggre...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22804 [SPARK-25665][SQL][TEST] Refactor ObjectHashAggregateExecBenchmark to⦠## What changes were proposed in this pull request? Refactor ObjectHashAggregateExecBenchmark to use main method ## How was this patch tested? Manually tested: ``` bin/spark-submit --class org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark --jars sql/catalyst/target/spark-catalyst_2.11-3.0.0-SNAPSHOT-tests.jar,core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar,sql/hive/target/spark-hive_2.11-3.0.0-SNAPSHOT.jar --packages org.spark-project.hive:hive-exec:1.2.1.spark2 sql/hive/target/spark-hive_2.11-3.0.0-SNAPSHOT-tests.jar ``` Generated results with: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark" ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25665 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22804.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22804 commit cf2bb2c0bea88110d0b20347177bafa4f129499c Author: Peter Toth Date: 2018-10-14T14:19:52Z [SPARK-25665][SQL][TEST] Refactor ObjectHashAggregateExecBenchmark to use main method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22789: [SPARK-25767][SQL] fix inputVars preparation if o...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22789 [SPARK-25767][SQL] fix inputVars preparation if outputVars is a lazy stream ## What changes were proposed in this pull request? Code generation is incorrect if `outputVars` parameter of `consume` method in `CodegenSupport` contains a lazy evaluated stream of expressions. This PR fixes the issue by forcing the evaluation of `inputVars` before generating the code for UnsafeRow. ## How was this patch tested? Tested with the sample program provided in https://issues.apache.org/jira/browse/SPARK-25767 You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25767 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22789.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22789 commit 2ca1795da0af56021e7002bcae8ed8545eb72b6d Author: Peter Toth Date: 2018-10-21T20:53:08Z [SPARK-25767][SQL] fix inputVars preparation if outputVars is a lazy stream Change-Id: Iac584a018f9892367841357c667ccaec1c15047b --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22766: [SPARK-25768][SQL] fix constant argument expecting UDAFs
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22766 cc @cloud-fan I believe this is a regression because https://issues.apache.org/jira/browse/SPARK-18186 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22766#discussion_r226345858 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -339,40 +339,38 @@ private[hive] case class HiveUDAFFunction( val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) resolver.getEvaluator(parameterInfo) } + + private case class Mode(evaluator: GenericUDAFEvaluator, objectInspector: ObjectInspector) --- End diff -- evaluator and object inspector need to be initialized together --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22766 [SPARK-25768][SQL] fix constant argument expecting UDAFs ## What changes were proposed in this pull request? This change makes all fields of `HiveUDAFFunction` lazy. ## How was this patch tested? added new UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25768 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22766.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22766 commit 0305497e8d3c8f66340d3055e6a226bd6ee2922a Author: Peter Toth Date: 2018-10-18T13:32:42Z [SPARK-25768][SQL] fix constant argument expecting UDFs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22664: [SPARK-25662][SQL][TEST] Refactor DataSourceReadBenchmar...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22664 Thanks for the review @dongjoon-hyun and @dbtsai . I have one question though, I still don't see https://issues.apache.org/jira/browse/SPARK-25662 assigned to me. Could you please look into it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @srowen, I saw your last comment on https://github.com/peter-toth/spark/tree/SPARK-25150. I submitted this PR to solve that ticket and I believe the description here explains what is the real issue there. I would appreciate your thoughts on this PR, unfortunately it got stuck a bit lately. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22664: [SPARK-25662][TEST] Refactor DataSourceReadBenchm...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22664#discussion_r22366 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala --- @@ -34,10 +34,15 @@ import org.apache.spark.sql.vectorized.ColumnVector /** * Benchmark to measure data source read performance. - * To run this: - * spark-submit --class + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class --- End diff -- ``` bin/spark-submit --class org.apache.spark.sql.execution.benchmark.DataSourceReadBenchmark --jars core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar,sql/catalyst/target/spark-catalyst_2.11-3.0.0-SNAPSHOT-tests.jar sql/core/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jar ``` does work for me, but I checked in `FilterPushdownBenchmark` and it seems we don't mention other required jars. Shall I modify the command? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22664: [SPARK-25662][TEST] Refactor DataSourceReadBenchm...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22664 [SPARK-25662][TEST] Refactor DataSourceReadBenchmark to use main method ## What changes were proposed in this pull request? 1. Refactor DataSourceReadBenchmark ## How was this patch tested? Manually tested and regenerated results. ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.DataSourceReadBenchmark" ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25662 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22664 commit 0f82794c67749c4dad65d5672a853d04096a9785 Author: Peter Toth Date: 2018-10-07T15:01:05Z [SPARK-25662][TEST] Refactor DataSourceReadBenchmark to use main method Change-Id: Icfd0484c8e0fef2ed0b184e09e52db9432e0a250 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22603: [SPARK-25062][SQL] Clean up BlockLocations in InMemoryFi...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22603 Thanks @dongjoon-hyun , `petertoth` is my JIRA user id. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22603: [SPARK-25062][SQL] Clean up BlockLocations in InMemoryFi...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22603 Thanks @cloud-fan for the review. I've fixed your findings. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 Also please consider that currently (and also after this PR) using `b` and `c` from the description: ``` b.join(c, b("id") === b("id"), "inner") ``` is interpreted as ``` b.join(c, b("id") === c("id"), "inner") ``` so my PR just extends this feature to the case with subsequent joins. @cloud-fan , @viirya what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22603: [SPARK-25062][SQL] Clean up BlockLocations in InMemoryFi...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22603 Thanks @dongjoon-hyun for the review. I've fixed your findings. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22617: [SPARK-25484][TEST] Refactor ExternalAppendOnlyUnsafeRow...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22617 cc @dongjoon-hyun @seancxmao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22617: [SPARK-25484][TEST] Refactor ExternalAppendOnlyUn...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22617 [SPARK-25484][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark ## What changes were proposed in this pull request? 1. Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark 2. Fixes issue in `ExternalAppendOnlyUnsafeRowArray` creation ## How was this patch tested? Manually tested and regenerated results. You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25484 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22617 commit b9991cac48b6e28c2dfdbdd66e4f26008283643f Author: Peter Toth Date: 2018-10-02T14:31:44Z [SPARK-25484][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 Thanks @viirya, your analysis is correct. Unfortunately an attribute doesn't have a reference to its dataset so I don't think this scenario can be solved easily. I believe the good solution would be something like https://github.com/apache/spark/pull/21449 But I would argue that my PR still does make sense and an `a("id") === b("id")` condition in a join where `c` is joined is not expected, and actually very likely a typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @cloud-fan could you please help me with this PR and take it one step forward? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22603: SPARK-25062: clean up BlockLocations in InMemoryF...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22603#discussion_r221898450 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -315,7 +315,12 @@ object InMemoryFileIndex extends Logging { // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val locations = fs.getFileBlockLocations(f, 0, f.getLen).map( +loc => if (loc.getClass == classOf[BlockLocation]) { --- End diff -- :thumbsup: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22603: SPARK-25062: clean up BlockLocations in InMemoryF...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22603#discussion_r221890344 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -315,7 +315,12 @@ object InMemoryFileIndex extends Logging { // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val locations = fs.getFileBlockLocations(f, 0, f.getLen).map( +loc => if (loc.getClass == classOf[BlockLocation]) { --- End diff -- Thanks @mgaido91, but loc is always an instance of `BlockLocation` (might be a subclass such as `HdfsBlockLocation`) so isInstanceOf[BlockLocation] or pattern matching would return always true. I want to test that the class of loc is exactly `BlockLocation` and if it is we don't need to convert it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22603: SPARK-25062: clean up BlockLocations in InMemoryF...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22603 SPARK-25062: clean up BlockLocations in InMemoryFileIndex ## What changes were proposed in this pull request? `InMemoryFileIndex` caches `FileStatus` objects to paths. Each `FileStatus` object can contain several `BlockLocations`. Depending on the parallel discovery threshold (`spark.sql.sources.parallelPartitionDiscovery.threshold`) the file listing can happen on the driver or on the executors. If the listing happens on the executors the block location objects are converted to simple `BlockLocation` objects to ensure serialization requirements. If it happens on the driver then there is no conversion and depending on the file system a `BlockLocation` object can be a subclass like `HdfsBlockLocation` and consume more memory. This PR adds the conversion to the latter case and decreases memory consumption. ## How was this patch tested? Added unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25062 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22603 commit 5735dba9a43c336843ccc531831105d6c23b4586 Author: Peter Toth Date: 2018-09-30T13:01:36Z SPARK-25062: clean up BlockLocations in InMemoryFileIndex --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r219617722 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala --- @@ -166,7 +168,7 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) val sameResult = sameSchema.find(_.sameResult(sub.plan)) if (sameResult.isDefined) { - sub.withNewPlan(sameResult.get) + sub.withNewPlan(sameResult.get).withNewExprId() --- End diff -- Can we avoid double copy()? Or is it cleaner this way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r219616464 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2", "t3") { --- End diff -- There is no need for "t3". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @cloud-fan, does the new description defines the scope as you suggested? Is there anything I can add to this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @cloud-fan , I added some explanation to the description in which cases this PR helps and also where it doesn't. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Rewrite condition when deduplicate Jo...
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @cloud-fan this PR doesn't solve that question. There are some hacks in `Dataset.join` to handle `EqualTo` and `EqualNullSafe` with duplicated attributes and those hacks are still required with this PR. But I believe there are other initiatives to solve that long-standing issue like https://github.com/apache/spark/pull/21449 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215571877 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -921,12 +924,18 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => -j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => +val (dedupedRight, attributeRewrites) = dedupRight(left, right) +val changedCondition = condition.map(_.transform { + case attr: Attribute if attr.resolved => attributeRewrites.getOrElse(attr, attr) --- End diff -- changed to `dedupAttr()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215571612 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty) --- End diff -- ok, reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215571667 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala --- @@ -23,12 +23,14 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { + val empty = new AttributeMap(Map.empty) + def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } } -class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) +class AttributeMap[+A](val baseMap: Map[ExprId, (Attribute, A)]) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215571480 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala --- @@ -23,12 +23,14 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { + val empty = new AttributeMap(Map.empty) --- End diff -- Reverted to the previous one where we have one val in ResolveReferences. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215504208 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -921,12 +924,18 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => -j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => +val (dedupedRight, attributeRewrites) = dedupRight(left, right) +val changedCondition = condition.map(_.transform { + case attr: Attribute if attr.resolved => attributeRewrites.getOrElse(attr, attr) --- End diff -- but actually there might be an issue here, instead of `attributeRewrites.getOrElse(attr, attr)` I might need to use `dedupAttr(attr, attributeRewrites)` which retains attribute qualifier --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215503790 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -921,12 +924,18 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => -j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => +val (dedupedRight, attributeRewrites) = dedupRight(left, right) +val changedCondition = condition.map(_.transform { + case attr: Attribute if attr.resolved => attributeRewrites.getOrElse(attr, attr) --- End diff -- Simply they will be resolved later. `attributeRewrites` returned by `dedupRight()` can contain resolved attributes only. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215499599 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala --- @@ -23,12 +23,14 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { + val empty = new AttributeMap(Map.empty) --- End diff -- @gatorsmile this was the initial version. then came the idea of @mgaido91 and @maropu to use val (instead of def) either here or in ResolveReferences to spare some empty() calls --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215274203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -754,11 +754,14 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { + --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215274137 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala --- @@ -23,12 +23,14 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { + var empty = new AttributeMap(Map.empty) --- End diff -- oh thanks, fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215255291 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -754,11 +754,16 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { + +private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Rewrite condition when dedupli...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r215189187 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -754,11 +754,16 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { + +private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) --- End diff -- @mgaido91 , I agree with you and happy to do it, just saw your concerns about binary compatibility. @cloud-fan @maropu please share your thoughts on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Fix attribute deduplication in join
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 @mgaido91 , 2.2 also suffered from this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214793247 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +val a = spark.range(1, 5) +val b = spark.range(10) +val c = b.filter($"id" % 2 === 0) + +val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- That simpler join doesn't hit the issue. It is handled by a different rule `ResolveNaturalAndUsingJoin`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22318: [SPARK-25150][SQL] Fix attribute deduplication in join
Github user peter-toth commented on the issue: https://github.com/apache/spark/pull/22318 Also added missing `if attr.resolved` which I think will fix the UT issues. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214732767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -817,7 +819,7 @@ class Analyzer( case s: SubqueryExpression => s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } - } + }, attributeRewrites) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214732751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - right transformUp { + (right transformUp { --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214732731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -921,12 +930,16 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => -j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => +val (dedupedRight, attributeRewrites) = dedupRight(left, right) +val changedCondition = condition.map(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- thanks! fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214666748 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- I think we do need `a` here. If we dropped `a` and the test would become like: ```scala val b = spark.range(1, 5) val c = b.filter($"id" % 2 === 0) val r = b.join(c, b("id") === c("id"), "inner") checkAnswer(r, Row(2, 2) :: Row(4, 4) :: Nil) ``` then the test would pass even without the fix. This is because we have a special case to handle {{id = id}} like conditions in case of EqualTo and EqualNullSafe in Dataset. My fix comes into play in some other cases where there is an AttributeReference change in the right side of a join due to deduplication. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r21451 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -895,6 +897,13 @@ class Analyzer( case _ => e.mapChildren(resolve(_, q)) } +private def rewriteJoinCondition( +e: Expression, +attributeRewrites: AttributeMap[Attribute]): Expression = e match { + case a: Attribute => attributeRewrites.getOrElse(a, a) + case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- sorry, we can't do that, it wouldn't mean the same --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214666333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { --- End diff -- actually we can remove this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214666206 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") + + checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) +} + } + --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22318 [SPARK-25150][SQL] Fix attribute deduplication in join ## What changes were proposed in this pull request? Fixes attribute deduplication in join conditions. ## How was this patch tested? Added unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25150 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22318.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22318 commit be7d8e7fb8439e3bb3238269263a37556e6bf9b1 Author: Peter Toth Date: 2018-09-02T17:56:18Z [SPARK-25150][SQL] Fix attribute deduplication in join --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org