[jira] [Commented] (SPARK-47836) Performance problem with QuantileSummaries
[ https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836606#comment-17836606 ] Tanel Kiis commented on SPARK-47836: I would be willing to make a PR, but I do not know the right solution. Migrating from a custom solution to the KllDoublesSketch would be the cleanest, but its results are nondeterministic > Performance problem with QuantileSummaries > -- > > Key: SPARK-47836 > URL: https://issues.apache.org/jira/browse/SPARK-47836 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Tanel Kiis >Priority: Major > > SPARK-29336 caused a severe performance regression. > In practice a partial_aggregate with several approx_percentile calls ran less > than hour and the final aggrergation after exchange would have taken over a > week. > Simple percentile ran about the same time in the first part and the final > aggregate ran very quickly. > I made a benchmark, and it reveals that the merge operation is very-very > slow: > https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e > From my experiments it looks like it is n^2 with the number of partitions > (number of partial aggregations to merge). > When I reverted the changes made in this PR, then the "Only insert" and > "Insert & merge" were very similar. > The cause seems to be, that compressImmut does not reduce the number samples > allmost at all after merges and just keeps iterating over an evergrowing list. > I was not able to figure out how to fix the issue without just reverting the > PR. > I also added a benchmark with KllDoublesSketch from the apache datasketches > project and it worked even better than this class before this PR. > Only downside was that it is not-deterministic. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-47836) Performance problem with QuantileSummaries
[ https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836605#comment-17836605 ] Tanel Kiis commented on SPARK-47836: {noformat} QuantileSummaries:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative Only insert 168171 8 6.0 167.6 1.0X Insert & merge 6690 6792 143 0.16690.3 0.0X KllFloatsSketch insert 44 47 6 22.5 44.4 3.8X KllFloatsSketch Insert & merge 55 57 6 18.3 54.7 3.1X {noformat} {code:java} object QuantileSummariesBenchmark extends BenchmarkBase { def test(name: String, numValues: Int): Unit = { runBenchmark(name) { val values = (1 to numValues).map(_ => Random.nextDouble()) val benchmark = new Benchmark(name, numValues, output = output) benchmark.addCase("Only insert") { _: Int => var summaries = new QuantileSummaries( compressThreshold = QuantileSummaries.defaultCompressThreshold, relativeError = QuantileSummaries.defaultRelativeError) for (value <- values) { summaries = summaries.insert(value) } summaries = summaries.compress() println("Median: " + summaries.query(0.5)) } benchmark.addCase("Insert & merge") { _: Int => // Insert values in batches of 1000 and merge the summaries. val summaries = values.grouped(1000).map(vs => { var partialSummaries = new QuantileSummaries( compressThreshold = QuantileSummaries.defaultCompressThreshold, relativeError = QuantileSummaries.defaultRelativeError) for (value <- vs) { partialSummaries = partialSummaries.insert(value) } partialSummaries.compress() }).reduce(_.merge(_)) println("Median: " + summaries.query(0.5)) } benchmark.addCase("KllFloatsSketch insert") { _: Int => // Insert values in batches of 1000 and merge the summaries. val summaries = KllDoublesSketch.newHeapInstance( KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError, true) ) for (value <- values) { summaries.update(value) } println("Median: " + summaries.getQuantile(0.5)) } benchmark.addCase("KllFloatsSketch Insert & merge") { _: Int => // Insert values in batches of 1000 and merge the summaries. val summaries = values.grouped(1000).map(vs => { val partialSummaries = KllDoublesSketch.newHeapInstance( KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError, true) ) for (value <- vs) { partialSummaries.update(value) } partialSummaries }).reduce((a, b) => { a.merge(b) a }) println("Median: " + summaries.getQuantile(0.5)) } benchmark.run() } } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { test("QuantileSummaries", 1_000_000) } } {code} > Performance problem with QuantileSummaries > -- > > Key: SPARK-47836 > URL: https://issues.apache.org/jira/browse/SPARK-47836 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Tanel Kiis >Priority: Major > > SPARK-29336 caused a severe performance regression. > In practice a partial_aggregate with several approx_percentile calls ran less > than hour and the final aggrergation after exchange would have taken over a > week. > Simple percentile ran about the same time in the first part and the final > aggregate ran very quickly. > I made a benchmark, and it reveals that the merge operation is very-very > slow: > https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e > From my experiments it looks like it is n^2 with the number of partitions > (number of partial aggregations to merge). > When I reverted the changes made in this PR, then the "Only insert" and > "Insert & merge" were very similar. > The cause seems to be, that compressImmut does not reduce the number samples > allmost at all after merges and just keeps iterating over an evergrowing list. > I was not able to figure out how to fix the issue without just reverting the > PR. > I also added a benchmark with KllDoublesSketch from the apache datasketches > project and it worked even better than
[jira] [Updated] (SPARK-47836) Performance problem with QuantileSummaries
[ https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-47836: --- Description: SPARK-29336 caused a severe performance regression. In practice a partial_aggregate with several approx_percentile calls ran less than hour and the final aggrergation after exchange would have taken over a week. Simple percentile ran about the same time in the first part and the final aggregate ran very quickly. I made a benchmark, and it reveals that the merge operation is very-very slow: https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e >From my experiments it looks like it is n^2 with the number of partitions >(number of partial aggregations to merge). When I reverted the changes made in this PR, then the "Only insert" and "Insert & merge" were very similar. The cause seems to be, that compressImmut does not reduce the number samples allmost at all after merges and just keeps iterating over an evergrowing list. I was not able to figure out how to fix the issue without just reverting the PR. I also added a benchmark with KllDoublesSketch from the apache datasketches project and it worked even better than this class before this PR. Only downside was that it is not-deterministic. > Performance problem with QuantileSummaries > -- > > Key: SPARK-47836 > URL: https://issues.apache.org/jira/browse/SPARK-47836 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Tanel Kiis >Priority: Major > > SPARK-29336 caused a severe performance regression. > In practice a partial_aggregate with several approx_percentile calls ran less > than hour and the final aggrergation after exchange would have taken over a > week. > Simple percentile ran about the same time in the first part and the final > aggregate ran very quickly. > I made a benchmark, and it reveals that the merge operation is very-very > slow: > https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e > From my experiments it looks like it is n^2 with the number of partitions > (number of partial aggregations to merge). > When I reverted the changes made in this PR, then the "Only insert" and > "Insert & merge" were very similar. > The cause seems to be, that compressImmut does not reduce the number samples > allmost at all after merges and just keeps iterating over an evergrowing list. > I was not able to figure out how to fix the issue without just reverting the > PR. > I also added a benchmark with KllDoublesSketch from the apache datasketches > project and it worked even better than this class before this PR. > Only downside was that it is not-deterministic. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46070) Precompile regex patterns in SparkDateTimeUtils.getZoneId
Tanel Kiis created SPARK-46070: -- Summary: Precompile regex patterns in SparkDateTimeUtils.getZoneId Key: SPARK-46070 URL: https://issues.apache.org/jira/browse/SPARK-46070 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.0 Reporter: Tanel Kiis SparkDateTimeUtils.getZoneId uses String.replaceFirst method, that internally does a Pattern.compile(regex). This method is called once for each dataset row when using functions like from_utc_timestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40664) Union in query can remove cache from the plan
[ https://issues.apache.org/jira/browse/SPARK-40664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-40664: --- Description: Failing unitest: {code} test("SPARK-40664: Cache with join, union and renames") { val df1 = Seq("1", "2").toDF("a") val df2 = Seq("2", "3").toDF("a") .withColumn("b", lit("b")) val joined = df1.join(broadcast(df2), "a") // Messing around the column can cause some problems with cache manager .withColumn("tmp_b", $"b") .drop("b") .withColumnRenamed("tmp_b", "b") .cache() val unioned = joined.union(joined) assertCached(unioned, 2) } {code} After this PR the test started failing: https://github.com/apache/spark/pull/35214 Plan before: {code} == Physical Plan == Union :- InMemoryTableScan [a#4, b#23] : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(2) Project [a#4, b AS b#23] : +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false : :- *(2) Project [value#1 AS a#4] : : +- *(2) Filter isnotnull(value#1) : : +- *(2) LocalTableScan [value#1] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] :+- *(1) Project [value#7 AS a#10] : +- *(1) Filter isnotnull(value#7) : +- *(1) LocalTableScan [value#7] +- InMemoryTableScan [a#4, b#23] +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Project [a#4, b AS b#23] +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false :- *(2) Project [value#1 AS a#4] : +- *(2) Filter isnotnull(value#1) : +- *(2) LocalTableScan [value#1] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] +- *(1) Project [value#7 AS a#10] +- *(1) Filter isnotnull(value#7) +- *(1) LocalTableScan [value#7] {code} Plan after: {code} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Union :- Project [a#4, b AS b#23] : +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false : :- Project [value#1 AS a#4] : : +- Filter isnotnull(value#1) : : +- LocalTableScan [value#1] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#115] :+- Project [value#7 AS a#10] : +- Filter isnotnull(value#7) : +- LocalTableScan [value#7] +- Project [a#4, b AS b#39] +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false :- Project [value#36 AS a#4] : +- Filter isnotnull(value#36) : +- LocalTableScan [value#36] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#118] +- Project [value#37 AS a#10] +- Filter isnotnull(value#37) +- LocalTableScan [value#37] {code} (The InMemoryTableScan is missing) was: Failing unitest: {code} test("SPARK-40664: Cache with join, union and renames") { val df1 = Seq("1", "2").toDF("a") val df2 = Seq("2", "3").toDF("a") .withColumn("b", lit("b")) val joined = df1.join(broadcast(df2), "a") // Messing around the column can cause some problems with cache manager .withColumn("tmp_b", $"b") .drop("b") .withColumnRenamed("tmp_b", "b") .cache() val unioned = joined.union(joined) assertCached(unioned, 2) } {code} After this PR the test started failing: https://github.com/apache/spark/pull/35214 Plan before: {code} Union :- InMemoryTableScan [a#4, b#23] : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(2) Project [a#4, b AS b#23] : +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false : :- *(2) Project [value#1 AS a#4] : : +- *(2) Filter isnotnull(value#1) : : +- *(2) LocalTableScan [value#1] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] :+- *(1) Project [value#7 AS a#10] : +- *(1) Filter isnotnull(value#7) : +- *(1) LocalTableScan [value#7] +- InMemoryTableScan [a#4, b#23] +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Project [a#4, b AS b#23] +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false :- *(2) Project [value#1 AS a#4] : +- *(2) Filter isnotnull(value#1)
[jira] [Updated] (SPARK-40664) Union in query can remove cache from the plan
[ https://issues.apache.org/jira/browse/SPARK-40664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-40664: --- Description: Failing unitest: {code} test("SPARK-40664: Cache with join, union and renames") { val df1 = Seq("1", "2").toDF("a") val df2 = Seq("2", "3").toDF("a") .withColumn("b", lit("b")) val joined = df1.join(broadcast(df2), "a") // Messing around the column can cause some problems with cache manager .withColumn("tmp_b", $"b") .drop("b") .withColumnRenamed("tmp_b", "b") .cache() val unioned = joined.union(joined) assertCached(unioned, 2) } {code} After this PR the test started failing: https://github.com/apache/spark/pull/35214 Plan before: {code} Union :- InMemoryTableScan [a#4, b#23] : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(2) Project [a#4, b AS b#23] : +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false : :- *(2) Project [value#1 AS a#4] : : +- *(2) Filter isnotnull(value#1) : : +- *(2) LocalTableScan [value#1] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] :+- *(1) Project [value#7 AS a#10] : +- *(1) Filter isnotnull(value#7) : +- *(1) LocalTableScan [value#7] +- InMemoryTableScan [a#4, b#23] +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Project [a#4, b AS b#23] +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false :- *(2) Project [value#1 AS a#4] : +- *(2) Filter isnotnull(value#1) : +- *(2) LocalTableScan [value#1] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] +- *(1) Project [value#7 AS a#10] +- *(1) Filter isnotnull(value#7) +- *(1) LocalTableScan [value#7] {code} Plan after: {code} AdaptiveSparkPlan isFinalPlan=false +- Union :- Project [a#4, b AS b#23] : +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false : :- Project [value#1 AS a#4] : : +- Filter isnotnull(value#1) : : +- LocalTableScan [value#1] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#115] :+- Project [value#7 AS a#10] : +- Filter isnotnull(value#7) : +- LocalTableScan [value#7] +- Project [a#4, b AS b#39] +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false :- Project [value#36 AS a#4] : +- Filter isnotnull(value#36) : +- LocalTableScan [value#36] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#118] +- Project [value#37 AS a#10] +- Filter isnotnull(value#37) +- LocalTableScan [value#37] {code} (The InMemoryTableScan is missing) > Union in query can remove cache from the plan > - > > Key: SPARK-40664 > URL: https://issues.apache.org/jira/browse/SPARK-40664 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > Failing unitest: > {code} > test("SPARK-40664: Cache with join, union and renames") { > val df1 = Seq("1", "2").toDF("a") > val df2 = Seq("2", "3").toDF("a") > .withColumn("b", lit("b")) > val joined = df1.join(broadcast(df2), "a") > // Messing around the column can cause some problems with cache manager > .withColumn("tmp_b", $"b") > .drop("b") > .withColumnRenamed("tmp_b", "b") > .cache() > val unioned = joined.union(joined) > assertCached(unioned, 2) > } > {code} > After this PR the test started failing: > https://github.com/apache/spark/pull/35214 > Plan before: > {code} > Union > :- InMemoryTableScan [a#4, b#23] > : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, > deserialized, 1 replicas) > : +- *(2) Project [a#4, b AS b#23] > : +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, > false > : :- *(2) Project [value#1 AS a#4] > : : +- *(2) Filter isnotnull(value#1) > : : +- *(2) LocalTableScan [value#1] > : +- BroadcastExchange > HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] > :+- *(1) Project [value#7 AS a#10] > : +- *(1) Filter isnotnull(value#7) > :
[jira] [Commented] (SPARK-40664) Union in query can remove cache from the plan
[ https://issues.apache.org/jira/browse/SPARK-40664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17612881#comment-17612881 ] Tanel Kiis commented on SPARK-40664: I do not think that https://github.com/apache/spark/pull/35214 has a bug, but it instead revealed an existing bug in cache management. > Union in query can remove cache from the plan > - > > Key: SPARK-40664 > URL: https://issues.apache.org/jira/browse/SPARK-40664 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > Failing unitest: > {code} > test("SPARK-40664: Cache with join, union and renames") { > val df1 = Seq("1", "2").toDF("a") > val df2 = Seq("2", "3").toDF("a") > .withColumn("b", lit("b")) > val joined = df1.join(broadcast(df2), "a") > // Messing around the column can cause some problems with cache manager > .withColumn("tmp_b", $"b") > .drop("b") > .withColumnRenamed("tmp_b", "b") > .cache() > val unioned = joined.union(joined) > assertCached(unioned, 2) > } > {code} > After this PR the test started failing: > https://github.com/apache/spark/pull/35214 > Plan before: > {code} > Union > :- InMemoryTableScan [a#4, b#23] > : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, > deserialized, 1 replicas) > : +- *(2) Project [a#4, b AS b#23] > : +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, > false > : :- *(2) Project [value#1 AS a#4] > : : +- *(2) Filter isnotnull(value#1) > : : +- *(2) LocalTableScan [value#1] > : +- BroadcastExchange > HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] > :+- *(1) Project [value#7 AS a#10] > : +- *(1) Filter isnotnull(value#7) > : +- *(1) LocalTableScan [value#7] > +- InMemoryTableScan [a#4, b#23] > +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, > deserialized, 1 replicas) > +- *(2) Project [a#4, b AS b#23] >+- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, > false > :- *(2) Project [value#1 AS a#4] > : +- *(2) Filter isnotnull(value#1) > : +- *(2) LocalTableScan [value#1] > +- BroadcastExchange > HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35] > +- *(1) Project [value#7 AS a#10] > +- *(1) Filter isnotnull(value#7) >+- *(1) LocalTableScan [value#7] > {code} > Plan after: > {code} > AdaptiveSparkPlan isFinalPlan=false > +- Union >:- Project [a#4, b AS b#23] >: +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false >: :- Project [value#1 AS a#4] >: : +- Filter isnotnull(value#1) >: : +- LocalTableScan [value#1] >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > string, true]),false), [id=#115] >:+- Project [value#7 AS a#10] >: +- Filter isnotnull(value#7) >: +- LocalTableScan [value#7] >+- Project [a#4, b AS b#39] > +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false > :- Project [value#36 AS a#4] > : +- Filter isnotnull(value#36) > : +- LocalTableScan [value#36] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > string, true]),false), [id=#118] > +- Project [value#37 AS a#10] >+- Filter isnotnull(value#37) > +- LocalTableScan [value#37] > {code} > (The InMemoryTableScan is missing) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40664) Union in query can remove cache from the plan
Tanel Kiis created SPARK-40664: -- Summary: Union in query can remove cache from the plan Key: SPARK-40664 URL: https://issues.apache.org/jira/browse/SPARK-40664 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Tanel Kiis -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField
[ https://issues.apache.org/jira/browse/SPARK-38485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17523363#comment-17523363 ] Tanel Kiis commented on SPARK-38485: Is there then even any point in having non-deterministic methods in spark? Some optimizations are disabled for them do avoid similar situations. > Non-deterministic UDF executed multiple times when combined with withField > -- > > Key: SPARK-38485 > URL: https://issues.apache.org/jira/browse/SPARK-38485 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: Correctness > > When adding fields to a result of a non-deterministic UDF, that returns a > struct, then that UDF is executed multiple times (once per field) for each > row. > In this UT df1 passes, but df2 fails with something like: > "279751724 did not equal -1023188908" > {code} > test("SPARK-X: non-deterministic UDF should be called once when adding > fields") { > val nondeterministicUDF = udf((s: Int) => { > val r = Random.nextInt() > // Both values should be the same > GroupByKey(r, r) > }).asNondeterministic() > val df1 = spark.range(5).select(nondeterministicUDF($"id")) > df1.collect().foreach { > row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) > } > val df2 = > spark.range(5).select(nondeterministicUDF($"id").withField("new", lit(7))) > df2.collect().foreach { > row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField
[ https://issues.apache.org/jira/browse/SPARK-38485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-38485: --- Description: When adding fields to a result of a non-deterministic UDF, that returns a struct, then that UDF is executed multiple times (once per field) for each row. In this UT df1 passes, but df2 fails with something like: "279751724 did not equal -1023188908" {code} test("SPARK-X: non-deterministic UDF should be called once when adding fields") { val nondeterministicUDF = udf((s: Int) => { val r = Random.nextInt() // Both values should be the same GroupByKey(r, r) }).asNondeterministic() val df1 = spark.range(5).select(nondeterministicUDF($"id")) df1.collect().foreach { row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) } val df2 = spark.range(5).select(nondeterministicUDF($"id").withField("new", lit(7))) df2.collect().foreach { row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) } } {code} was: When adding fields to a result of a non-deterministic UDF, that returns a struct, then that UDF is executed multiple times (once per field) for each row. In this UT df1 passes, but df2 fails with something like: "279751724 did not equal -1023188908" {code} test("SPARK-X: non-deterministic UDF should be called once when adding fields") { val nondeterministicUDF = udf((s: Int) => { val r = Random.nextInt() // Both values should be the same GroupByKey(r, r) }).asNondeterministic() val df1 = spark.range(5).select( nondeterministicUDF($"id").as("struct")) df1.collect().foreach { row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) } val df2 = spark.range(5).select( nondeterministicUDF($"id").withField("new", lit(7)).as("struct")) df2.collect().foreach { row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) } } {code} > Non-deterministic UDF executed multiple times when combined with withField > -- > > Key: SPARK-38485 > URL: https://issues.apache.org/jira/browse/SPARK-38485 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: Correctness > > When adding fields to a result of a non-deterministic UDF, that returns a > struct, then that UDF is executed multiple times (once per field) for each > row. > In this UT df1 passes, but df2 fails with something like: > "279751724 did not equal -1023188908" > {code} > test("SPARK-X: non-deterministic UDF should be called once when adding > fields") { > val nondeterministicUDF = udf((s: Int) => { > val r = Random.nextInt() > // Both values should be the same > GroupByKey(r, r) > }).asNondeterministic() > val df1 = spark.range(5).select(nondeterministicUDF($"id")) > df1.collect().foreach { > row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) > } > val df2 = > spark.range(5).select(nondeterministicUDF($"id").withField("new", lit(7))) > df2.collect().foreach { > row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField
Tanel Kiis created SPARK-38485: -- Summary: Non-deterministic UDF executed multiple times when combined with withField Key: SPARK-38485 URL: https://issues.apache.org/jira/browse/SPARK-38485 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Tanel Kiis When adding fields to a result of a non-deterministic UDF, that returns a struct, then that UDF is executed multiple times (once per field) for each row. In this UT df1 passes, but df2 fails with something like: "279751724 did not equal -1023188908" {code} test("SPARK-X: non-deterministic UDF should be called once when adding fields") { val nondeterministicUDF = udf((s: Int) => { val r = Random.nextInt() // Both values should be the same GroupByKey(r, r) }).asNondeterministic() val df1 = spark.range(5).select( nondeterministicUDF($"id").as("struct")) df1.collect().foreach { row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) } val df2 = spark.range(5).select( nondeterministicUDF($"id").withField("new", lit(7)).as("struct")) df2.collect().foreach { row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1)) } } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38282) Avoid duplicating complex partitioning expressions
[ https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495899#comment-17495899 ] Tanel Kiis commented on SPARK-38282: [~cloud_fan], any ideas how to improve this? I could submit a PR, but I'm not sure, what would be the best way here. > Avoid duplicating complex partitioning expressions > -- > > Key: SPARK-38282 > URL: https://issues.apache.org/jira/browse/SPARK-38282 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > Spark will duplicate all non-trivial expressions in Window.partitionBy, that > will result in duplicate exchanges and WindowExec nodes. > An example unit test: > {code} > test("SPARK-38282: Avoid duplicating complex partitioning expressions") { > val group = functions.col("id") % 2 > val min = functions.min("id").over(Window.partitionBy(group)) > val max = functions.max("id").over(Window.partitionBy(group)) > val df1 = spark.range(1, 4) > .withColumn("ratio", max / min) > val df2 = spark.range(1, 4) > .withColumn("min", min) > .withColumn("max", max) > .select(col("id"), (col("max") / col("min")).as("ratio")) > Seq(df1, df2).foreach { df => > checkAnswer( > df, > Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) > val windows = collect(df.queryExecution.executedPlan) { > case w: WindowExec => w > } > assert(windows.size == 1) > } > } > {code} > The query plan for this (_w0#5L and _w1#6L are duplicates): > {code} > Window [min(id#2L) windowspecdefinition(_w1#6L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS _we1#8L], [_w1#6L] >+- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0 > +- AQEShuffleRead coalesced > +- ShuffleQueryStage 1 > +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, > [id=#256] >+- *(3) Project [id#2L, _w1#6L, _we0#7L] > +- Window [max(id#2L) windowspecdefinition(_w0#5L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS _we0#7L], [_w0#5L] > +- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0 > +- AQEShuffleRead coalesced >+- ShuffleQueryStage 0 > +- Exchange hashpartitioning(_w0#5L, 5), > ENSURE_REQUIREMENTS, [id=#203] > +- *(1) Project [id#2L, (id#2L % 2) AS > _w0#5L, (id#2L % 2) AS _w1#6L] > +- *(1) Range (1, 4, step=1, splits=2) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38282) Avoid duplicating complex partitioning expressions
[ https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-38282: --- Description: Spark will duplicate all non-trivial expressions in Window.partitionBy, that will result in duplicate exchanges and WindowExec nodes. An example unit test: {code} test("SPARK-38282: Avoid duplicating complex partitioning expressions") { val group = functions.col("id") % 2 val min = functions.min("id").over(Window.partitionBy(group)) val max = functions.max("id").over(Window.partitionBy(group)) val df1 = spark.range(1, 4) .withColumn("ratio", max / min) val df2 = spark.range(1, 4) .withColumn("min", min) .withColumn("max", max) .select(col("id"), (col("max") / col("min")).as("ratio")) Seq(df1, df2).foreach { df => checkAnswer( df, Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) val windows = collect(df.queryExecution.executedPlan) { case w: WindowExec => w } assert(windows.size == 1) } } {code} The query plan for this (_w0#5L and _w1#6L are duplicates): {code} Window [min(id#2L) windowspecdefinition(_w1#6L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we1#8L], [_w1#6L] +- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, [id=#256] +- *(3) Project [id#2L, _w1#6L, _we0#7L] +- Window [max(id#2L) windowspecdefinition(_w0#5L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#7L], [_w0#5L] +- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 0 +- Exchange hashpartitioning(_w0#5L, 5), ENSURE_REQUIREMENTS, [id=#203] +- *(1) Project [id#2L, (id#2L % 2) AS _w0#5L, (id#2L % 2) AS _w1#6L] +- *(1) Range (1, 4, step=1, splits=2) {code} was: {code} test("SPARK-X: Avoid duplicating complex partitioning expressions") { val group = functions.col("id") % 2 val min = functions.min("id").over(Window.partitionBy(group)) val max = functions.max("id").over(Window.partitionBy(group)) val df1 = spark.range(1, 4) .withColumn("ratio", max / min) val df2 = spark.range(1, 4) .withColumn("min", min) .withColumn("max", max) .select(col("id"), (col("max") / col("min")).as("ratio")) Seq(df1, df2).foreach { df => checkAnswer( df, Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) val windows = collect(df.queryExecution.executedPlan) { case w: WindowExec => w } assert(windows.size == 1) } } {code} > Avoid duplicating complex partitioning expressions > -- > > Key: SPARK-38282 > URL: https://issues.apache.org/jira/browse/SPARK-38282 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > Spark will duplicate all non-trivial expressions in Window.partitionBy, that > will result in duplicate exchanges and WindowExec nodes. > An example unit test: > {code} > test("SPARK-38282: Avoid duplicating complex partitioning expressions") { > val group = functions.col("id") % 2 > val min = functions.min("id").over(Window.partitionBy(group)) > val max = functions.max("id").over(Window.partitionBy(group)) > val df1 = spark.range(1, 4) > .withColumn("ratio", max / min) > val df2 = spark.range(1, 4) > .withColumn("min", min) > .withColumn("max", max) > .select(col("id"), (col("max") / col("min")).as("ratio")) > Seq(df1, df2).foreach { df => > checkAnswer( > df, > Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) > val windows = collect(df.queryExecution.executedPlan) { > case w: WindowExec => w > } > assert(windows.size == 1) > } > } > {code} > The query plan for this (_w0#5L and _w1#6L are duplicates): > {code} > Window [min(id#2L) windowspecdefinition(_w1#6L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS _we1#8L], [_w1#6L] >+- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0 > +- AQEShuffleRead coalesced > +- ShuffleQueryStage 1 > +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, > [id=#256] >+- *(3) Project [id#2L, _w1#6L, _we0#7L] > +- Window [max(id#2L) windowspecdefinition(_w0#5L, > specifiedwindowframe(RowFrame, unboundedpreceding$(),
[jira] [Updated] (SPARK-38282) Avoid duplicating complex partitioning expressions
[ https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-38282: --- Description: {code} test("SPARK-X: Avoid duplicating complex partitioning expressions") { val group = functions.col("id") % 2 val min = functions.min("id").over(Window.partitionBy(group)) val max = functions.max("id").over(Window.partitionBy(group)) val df1 = spark.range(1, 4) .withColumn("ratio", max / min) val df2 = spark.range(1, 4) .withColumn("min", min) .withColumn("max", max) .select(col("id"), (col("max") / col("min")).as("ratio")) Seq(df1, df2).foreach { df => checkAnswer( df, Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) val windows = collect(df.queryExecution.executedPlan) { case w: WindowExec => w } assert(windows.size == 1) } } {code} was: {code} test("SPARK-X: Avoid duplicating complex partitioning expressions") { val group = functions.col("id") % 2 val min = functions.min("id").over(Window.partitionBy(group)) val max = functions.max("id").over(Window.partitionBy(group)) val df1 = spark.range(1, 4) .withColumn("ratio", max / min) val df2 = spark.range(1, 4) .withColumn("min", min) .withColumn("max", max) .select(col("id"), (col("max") / col("min")).as("ratio")) Seq(df1, df2).foreach { df => checkAnswer( df, Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) val windows = collect(df.queryExecution.executedPlan) { case w: WindowExec => w } assert(windows.size == 1) } } {code} > Avoid duplicating complex partitioning expressions > -- > > Key: SPARK-38282 > URL: https://issues.apache.org/jira/browse/SPARK-38282 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > {code} > test("SPARK-X: Avoid duplicating complex partitioning expressions") { > val group = functions.col("id") % 2 > val min = functions.min("id").over(Window.partitionBy(group)) > val max = functions.max("id").over(Window.partitionBy(group)) > val df1 = spark.range(1, 4) > .withColumn("ratio", max / min) > val df2 = spark.range(1, 4) > .withColumn("min", min) > .withColumn("max", max) > .select(col("id"), (col("max") / col("min")).as("ratio")) > Seq(df1, df2).foreach { df => > checkAnswer( > df, > Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) > val windows = collect(df.queryExecution.executedPlan) { > case w: WindowExec => w > } > assert(windows.size == 1) > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38282) Avoid duplicating complex partitioning expressions
Tanel Kiis created SPARK-38282: -- Summary: Avoid duplicating complex partitioning expressions Key: SPARK-38282 URL: https://issues.apache.org/jira/browse/SPARK-38282 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Tanel Kiis {code} test("SPARK-X: Avoid duplicating complex partitioning expressions") { val group = functions.col("id") % 2 val min = functions.min("id").over(Window.partitionBy(group)) val max = functions.max("id").over(Window.partitionBy(group)) val df1 = spark.range(1, 4) .withColumn("ratio", max / min) val df2 = spark.range(1, 4) .withColumn("min", min) .withColumn("max", max) .select(col("id"), (col("max") / col("min")).as("ratio")) Seq(df1, df2).foreach { df => checkAnswer( df, Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) val windows = collect(df.queryExecution.executedPlan) { case w: WindowExec => w } assert(windows.size == 1) } } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37538) Replace single projection Expand with Project
Tanel Kiis created SPARK-37538: -- Summary: Replace single projection Expand with Project Key: SPARK-37538 URL: https://issues.apache.org/jira/browse/SPARK-37538 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Tanel Kiis -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-37487: --- Description: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-37487: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count and sum aggregate report twice the number of rows: {code} [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging was: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count and sum aggregate report twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100)
[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-37487: --- Labels: correctness (was: ) > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-X: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-37487: --- Summary: CollectMetrics is executed twice if it is followed by a sort (was: CollectMetrics is executed twice if it is followed by an sort) > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-X: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450452#comment-17450452 ] Tanel Kiis commented on SPARK-37487: [~cloud_fan] and [~sarutak], you helped with the last CollectMetrics bug. Perhaps you have some idea, why this is happening. > CollectMetrics is executed twice if it is followed by an sort > - > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-X: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-37487: --- Description: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count and sum aggregate reports twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging was: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count aggregate reports twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging > CollectMetrics is executed twice if it is followed by an sort > - > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name =
[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-37487: --- Description: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count aggregate reports twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging was: It is bets examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count aggregate reports twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging > CollectMetrics is executed twice if it is followed by an sort > - > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event",
[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-37487: --- Description: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count and sum aggregate report twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging was: It is best examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count and sum aggregate reports twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging > CollectMetrics is executed twice if it is followed by an sort > - > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name
[jira] [Created] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort
Tanel Kiis created SPARK-37487: -- Summary: CollectMetrics is executed twice if it is followed by an sort Key: SPARK-37487 URL: https://issues.apache.org/jira/browse/SPARK-37487 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Tanel Kiis It is bets examplified by this new UT in DataFrameCallbackSuite: {code} test("SPARK-X: get observable metrics with sort by callback") { val df = spark.range(100) .observe( name = "my_event", min($"id").as("min_val"), max($"id").as("max_val"), // Test unresolved alias sum($"id"), count(when($"id" % 2 === 0, 1)).as("num_even")) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .sort($"id".desc) validateObservedMetrics(df) } {code} The count aggregate reports twice the number of rows: {code} [info] - SPARK-X: get observable metrics with sort by callback *** FAILED *** (169 milliseconds) [info] [0,99,9900,100] did not equal [0,99,4950,50] (DataFrameCallbackSuite.scala:342) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) {code} I could not figure out how this happes. Hopefully the UT can help with debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36844) Window function "first" (unboundedFollowing) appears significantly slower than "last" (unboundedPreceding) in identical circumstances
[ https://issues.apache.org/jira/browse/SPARK-36844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436116#comment-17436116 ] Tanel Kiis commented on SPARK-36844: Hello, I also hit this issue a while back and found that, It is a bit explained in this code comment: https://github.com/apache/spark/blob/abf9675a7559d5666e40f25098334b5edbf8c0c3/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala#L609-L611 So it is not the fault of the first aggregator, but it is the UnboundedFollowing window frame. There are definetly some optimizations, that could be done. If I followed your code correctly, then I think you would be better of using the [lead|https://spark.apache.org/docs/latest/api/sql/index.html#lead] and [lag|https://spark.apache.org/docs/latest/api/sql/index.html#lag] window functions. With those you can drop the .rowsBetween(...) part from your window specs. > Window function "first" (unboundedFollowing) appears significantly slower > than "last" (unboundedPreceding) in identical circumstances > - > > Key: SPARK-36844 > URL: https://issues.apache.org/jira/browse/SPARK-36844 > Project: Spark > Issue Type: Bug > Components: PySpark, Windows >Affects Versions: 3.1.1 >Reporter: Alain Bryden >Priority: Minor > Attachments: Physical Plan 2 - workaround.png, Pysical Plan.png > > > I originally posted a question on SO because I thought perhaps I was doing > something wrong: > [https://stackoverflow.com/questions/69308560|https://stackoverflow.com/questions/69308560/spark-first-window-function-is-taking-much-longer-than-last?noredirect=1#comment122505685_69308560] > Perhaps I am, but I'm now fairly convinced that there's something wonky with > the implementation of `first` that's causing it to unnecessarily have a much > worse complexity than `last`. > > More or less copy-pasted from the above post: > I was working on a pyspark routine to interpolate the missing values in a > configuration table. > Imagine a table of configuration values that go from 0 to 50,000. The user > specifies a few data points in between (say at 0, 50, 100, 500, 2000, 50) > and we interpolate the remainder. My solution mostly follows [this blog > post|https://walkenho.github.io/interpolating-time-series-p2-spark/] quite > closely, except I'm not using any UDFs. > In troubleshooting the performance of this (takes ~3 minutes) I found that > one particular window function is taking all of the time, and everything else > I'm doing takes mere seconds. > Here is the main area of interest - where I use window functions to fill in > the previous and next user-supplied configuration values: > {code:python} > from pyspark.sql import Window, functions as F > # Create partition windows that are required to generate new rows from the > ones provided > win_last = Window.partitionBy('PORT_TYPE', > 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0) > win_next = Window.partitionBy('PORT_TYPE', > 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing) > # Join back in the provided config table to populate the "known" scale factors > df_part1 = (df_scale_factors_template > .join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter') > # Add computed columns that can lookup the prior config and next config for > each missing value > .withColumn('last_rank', F.last( F.col('rank'), > ignorenulls=True).over(win_last)) > .withColumn('last_sf', F.last( F.col('scale_factor'), > ignorenulls=True).over(win_last)) > ).cache() > debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1 > df_part2 = (df_part1 > .withColumn('next_rank', F.first(F.col('rank'), > ignorenulls=True).over(win_next)) > .withColumn('next_sf', F.first(F.col('scale_factor'), > ignorenulls=True).over(win_next)) > ).cache() > debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2 > df_part3 = (df_part2 > # Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * > (x-x1) > .withColumn('scale_factor', > F.when(F.col('last_rank')==F.col('next_rank'), > F.col('last_sf')) # Handle div/0 case > .otherwise(F.col('last_sf') + > ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) > * (F.col('rank')-F.col('last_rank' > .select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor') > ).cache() > debug_log_dataframe(df_part3, 'df_part3', explain: True) > {code} > > The above used to be a single chained dataframe statement, but I've since > split it into 3 parts so that I could isolate the part that's taking so long. > The results
[jira] [Created] (SPARK-37074) Push extra predicates through non-join
Tanel Kiis created SPARK-37074: -- Summary: Push extra predicates through non-join Key: SPARK-37074 URL: https://issues.apache.org/jira/browse/SPARK-37074 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Tanel Kiis In the Optimizer we could partially push some predicates through a non-join nodes, that produce new columns: Aggregate, Generate, Window. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates
[ https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421334#comment-17421334 ] Tanel Kiis commented on SPARK-36861: Yes, in 3.1 it is parsed as string. In 3.3 (master) it is parsed as date. > Partition columns are overly eagerly parsed as dates > > > Key: SPARK-36861 > URL: https://issues.apache.org/jira/browse/SPARK-36861 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Blocker > > I have an input directory with subdirs: > * hour=2021-01-01T00 > * hour=2021-01-01T01 > * hour=2021-01-01T02 > * ... > in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it > is parsed as date type and the hour part is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates
[ https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420553#comment-17420553 ] Tanel Kiis commented on SPARK-36861: Sorry, indeed I ran the test on master. Nevermind it then, does not impact the 3.2 release. > Partition columns are overly eagerly parsed as dates > > > Key: SPARK-36861 > URL: https://issues.apache.org/jira/browse/SPARK-36861 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > I have an input directory with subdirs: > * hour=2021-01-01T00 > * hour=2021-01-01T01 > * hour=2021-01-01T02 > * ... > in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it > is parsed as date type and the hour part is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates
[ https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420533#comment-17420533 ] Tanel Kiis commented on SPARK-36861: If this is expected behaviour, then I would expect there to be a simple way to turn this off. Currently only one I can think of is manually specifing the schema. > Partition columns are overly eagerly parsed as dates > > > Key: SPARK-36861 > URL: https://issues.apache.org/jira/browse/SPARK-36861 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > I have an input directory with subdirs: > * hour=2021-01-01T00 > * hour=2021-01-01T01 > * hour=2021-01-01T02 > * ... > in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it > is parsed as date type and the hour part is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-36861) Partition columns are overly eagerly parsed as dates
[ https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420532#comment-17420532 ] Tanel Kiis edited comment on SPARK-36861 at 9/27/21, 7:45 AM: -- [~Gengliang.Wang] I think, that this should be considered as a blocker for the 3.2 release was (Author: tanelk): [~Gengliang.Wang] I think, that this should be considered as a blocker. > Partition columns are overly eagerly parsed as dates > > > Key: SPARK-36861 > URL: https://issues.apache.org/jira/browse/SPARK-36861 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > I have an input directory with subdirs: > * hour=2021-01-01T00 > * hour=2021-01-01T01 > * hour=2021-01-01T02 > * ... > in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it > is parsed as date type and the hour part is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates
[ https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420532#comment-17420532 ] Tanel Kiis commented on SPARK-36861: [~Gengliang.Wang] I think, that this should be considered as a blocker. > Partition columns are overly eagerly parsed as dates > > > Key: SPARK-36861 > URL: https://issues.apache.org/jira/browse/SPARK-36861 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > I have an input directory with subdirs: > * hour=2021-01-01T00 > * hour=2021-01-01T01 > * hour=2021-01-01T02 > * ... > in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it > is parsed as date type and the hour part is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36861) Partition columns are overly eagerly parsed as dates
Tanel Kiis created SPARK-36861: -- Summary: Partition columns are overly eagerly parsed as dates Key: SPARK-36861 URL: https://issues.apache.org/jira/browse/SPARK-36861 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis I have an input directory with subdirs: * hour=2021-01-01T00 * hour=2021-01-01T01 * hour=2021-01-01T02 * ... in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it is parsed as date type and the hour part is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36496) Remove literals from grouping expressions when using the DataFrame API
[ https://issues.apache.org/jira/browse/SPARK-36496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-36496: --- Description: The RemoveLiteralFromGroupExpressions rule might not work, when using the DataFrame API > Remove literals from grouping expressions when using the DataFrame API > -- > > Key: SPARK-36496 > URL: https://issues.apache.org/jira/browse/SPARK-36496 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > The RemoveLiteralFromGroupExpressions rule might not work, when using the > DataFrame API -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36496) Remove literals from grouping expressions when using the DataFrame API
Tanel Kiis created SPARK-36496: -- Summary: Remove literals from grouping expressions when using the DataFrame API Key: SPARK-36496 URL: https://issues.apache.org/jira/browse/SPARK-36496 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35765) Distinct aggs are not duplicate sensitive
Tanel Kiis created SPARK-35765: -- Summary: Distinct aggs are not duplicate sensitive Key: SPARK-35765 URL: https://issues.apache.org/jira/browse/SPARK-35765 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis Extended RemoveRedundantAggregates to remove deduplicating aggregations before aggregations that ignore duplicates. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
Tanel Kiis created SPARK-35695: -- Summary: QueryExecutionListener does not see any observed metrics fired before persist/cache Key: SPARK-35695 URL: https://issues.apache.org/jira/browse/SPARK-35695 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired: {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
[ https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35695: --- Description: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} I assume, that on the first run, the event is fired. The listener: {code} val metricMaps = ArrayBuffer.empty[Map[String, Row]] val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metricMaps += qe.observedMetrics } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } spark.listenerManager.register(listener) {code} was: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} The listener: {code} val metricMaps = ArrayBuffer.empty[Map[String, Row]] val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metricMaps += qe.observedMetrics } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } spark.listenerManager.register(listener) {code} > QueryExecutionListener does not see any observed metrics fired before > persist/cache > --- > > Key: SPARK-35695 > URL: https://issues.apache.org/jira/browse/SPARK-35695 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > This example properly fires the event > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .collect() > {code} > But when I add persist, then no event is fired or seen (not sure which): > {code} > spark.range(100) > .observe( > name = "my_event", > avg($"id").cast("int").as("avg_val")) > .persist() > .collect() > {code} > I assume, that on the first run, the event is fired. > The listener: > {code} > val metricMaps = ArrayBuffer.empty[Map[String, Row]] > val listener = new QueryExecutionListener { > override def onSuccess(funcName: String, qe: QueryExecution, duration: > Long): Unit = { > metricMaps += qe.observedMetrics > } > override def onFailure(funcName: String, qe: QueryExecution, exception: > Exception): Unit = { > // No-op > } > } > spark.listenerManager.register(listener) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
[ https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35695: --- Description: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} The listener: {code} val metricMaps = ArrayBuffer.empty[Map[String, Row]] val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metricMaps += qe.observedMetrics } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } spark.listenerManager.register(listener) {code} was: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} I assume, that on the first run, the event is fired. The listener: {code} val metricMaps = ArrayBuffer.empty[Map[String, Row]] val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metricMaps += qe.observedMetrics } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } spark.listenerManager.register(listener) {code} > QueryExecutionListener does not see any observed metrics fired before > persist/cache > --- > > Key: SPARK-35695 > URL: https://issues.apache.org/jira/browse/SPARK-35695 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > This example properly fires the event > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .collect() > {code} > But when I add persist, then no event is fired or seen (not sure which): > {code} > spark.range(100) > .observe( > name = "my_event", > avg($"id").cast("int").as("avg_val")) > .persist() > .collect() > {code} > The listener: > {code} > val metricMaps = ArrayBuffer.empty[Map[String, Row]] > val listener = new QueryExecutionListener { > override def onSuccess(funcName: String, qe: QueryExecution, duration: > Long): Unit = { > metricMaps += qe.observedMetrics > } > override def onFailure(funcName: String, qe: QueryExecution, exception: > Exception): Unit = { > // No-op > } > } > spark.listenerManager.register(listener) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
[ https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17359849#comment-17359849 ] Tanel Kiis commented on SPARK-35695: [~cloud_fan] and [~sarutak], you are currently working on a PR related to the observed metrics. Perhaps you could take a look at this one too. > QueryExecutionListener does not see any observed metrics fired before > persist/cache > --- > > Key: SPARK-35695 > URL: https://issues.apache.org/jira/browse/SPARK-35695 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > This example properly fires the event > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .collect() > {code} > But when I add persist, then no event is fired or seen (not sure which): > {code} > spark.range(100) > .observe( > name = "my_event", > avg($"id").cast("int").as("avg_val")) > .persist() > .collect() > {code} > The listener: > {code} > val metricMaps = ArrayBuffer.empty[Map[String, Row]] > val listener = new QueryExecutionListener { > override def onSuccess(funcName: String, qe: QueryExecution, duration: > Long): Unit = { > metricMaps += qe.observedMetrics > } > override def onFailure(funcName: String, qe: QueryExecution, exception: > Exception): Unit = { > // No-op > } > } > spark.listenerManager.register(listener) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
[ https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35695: --- Description: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} The listener: {code} val metricMaps = ArrayBuffer.empty[Map[String, Row]] val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metricMaps += qe.observedMetrics } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } spark.listenerManager.register(listener) {code} was: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} > QueryExecutionListener does not see any observed metrics fired before > persist/cache > --- > > Key: SPARK-35695 > URL: https://issues.apache.org/jira/browse/SPARK-35695 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > This example properly fires the event > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .collect() > {code} > But when I add persist, then no event is fired or seen (not sure which): > {code} > spark.range(100) > .observe( > name = "my_event", > avg($"id").cast("int").as("avg_val")) > .persist() > .collect() > {code} > The listener: > {code} > val metricMaps = ArrayBuffer.empty[Map[String, Row]] > val listener = new QueryExecutionListener { > override def onSuccess(funcName: String, qe: QueryExecution, duration: > Long): Unit = { > metricMaps += qe.observedMetrics > } > override def onFailure(funcName: String, qe: QueryExecution, exception: > Exception): Unit = { > // No-op > } > } > spark.listenerManager.register(listener) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
[ https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35695: --- Description: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "my_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} was: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} > QueryExecutionListener does not see any observed metrics fired before > persist/cache > --- > > Key: SPARK-35695 > URL: https://issues.apache.org/jira/browse/SPARK-35695 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > This example properly fires the event > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .collect() > {code} > But when I add persist, then no event is fired or seen (not sure which): > {code} > spark.range(100) > .observe( > name = "my_event", > avg($"id").cast("int").as("avg_val")) > .persist() > .collect() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache
[ https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35695: --- Description: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired or seen (not sure which): {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} was: This example properly fires the event {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .collect() {code} But when I add persist, then no event is fired: {code} spark.range(100) .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) .persist() .collect() {code} > QueryExecutionListener does not see any observed metrics fired before > persist/cache > --- > > Key: SPARK-35695 > URL: https://issues.apache.org/jira/browse/SPARK-35695 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > This example properly fires the event > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .collect() > {code} > But when I add persist, then no event is fired or seen (not sure which): > {code} > spark.range(100) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .persist() > .collect() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35630) ExpandExec should not introduce unnecessary exchanges
Tanel Kiis created SPARK-35630: -- Summary: ExpandExec should not introduce unnecessary exchanges Key: SPARK-35630 URL: https://issues.apache.org/jira/browse/SPARK-35630 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis The Expand is commonly introduced by the RewriteDistinctAggregates optimizer rule. In that case there can be several attributes that are kept as they are by the Expand. If the child's output is partitioned by those attributes, then so will be the output of the Expand. In general case the Expand can output data with arbitrary partitioning, so set it as UNKNOWN partitioning. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17356253#comment-17356253 ] Tanel Kiis commented on SPARK-35296: Perhaps someone who knows the internals better, ccould help here. [~cloud_fan]? I tried to take another look, but I can't figure it out. > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Tanel Kiis >Priority: Major > Attachments: 2021-05-03_18-34.png > > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:147) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_282] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {code} > A workaround, that I used was to add .coalesce(1) before calling this method. > It happens in a quite complex query and I have not been able to reproduce > this with a simpler query > Added an screenshot of the debugger, at the moment of exception > !2021-05-03_18-34.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Affects Version/s: 3.2.0 > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Tanel Kiis >Priority: Major > Attachments: 2021-05-03_18-34.png > > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:147) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_282] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {code} > A workaround, that I used was to add .coalesce(1) before calling this method. > It happens in a quite complex query and I have not been able to reproduce > this with a simpler query > Added an screenshot of the debugger, at the moment of exception > !2021-05-03_18-34.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34623) Deduplicate window expressions
[ https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis resolved SPARK-34623. Resolution: Won't Do > Deduplicate window expressions > -- > > Key: SPARK-34623 > URL: https://issues.apache.org/jira/browse/SPARK-34623 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > Remove duplicate window expressions from the Window node -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-32801) Make InferFiltersFromConstraints take in account EqualNullSafe
[ https://issues.apache.org/jira/browse/SPARK-32801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis resolved SPARK-32801. Resolution: Won't Do > Make InferFiltersFromConstraints take in account EqualNullSafe > -- > > Key: SPARK-32801 > URL: https://issues.apache.org/jira/browse/SPARK-32801 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Tanel Kiis >Priority: Major > > InferFiltersFromConstraints only infers new filters using EqualTo, > generalized it to also include EqualNullSafe. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338465#comment-17338465 ] Tanel Kiis commented on SPARK-35296: I finally managed to change the UT in such way, that the assertion error happens - the following coalesce was the missing link. {code} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index b3d29df1b2..16ebddd75c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest } spark.listenerManager.register(listener) try { - val df = spark.range(100) + val df = spark.range(0, 100, 1, 5) .observe( name = "my_event", min($"id").as("min_val"), @@ -256,6 +256,7 @@ class DataFrameCallbackSuite extends QueryTest .observe( name = "other_event", avg($"id").cast("int").as("avg_val")) +.coalesce(2) def checkMetrics(metrics: Map[String, Row]): Unit = { assert(metrics.size === 2) {code} > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > Attachments: 2021-05-03_18-34.png > > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:147) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_282] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {code} > A workaround, that I used was to add .coalesce(1) before calling this method. > It happens in a quite complex query and I have not been able to reproduce > this with a simpler query > Added an screenshot of the debugger, at the moment of exception > !2021-05-03_18-34.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Attachment: 2021-05-03_18-34.png > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > Attachments: 2021-05-03_18-34.png > > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:147) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_282] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {code} > A workaround, that I used was to add .coalesce(1) before calling this method. > It happens in a quite complex query and I have not been able to reproduce > this with a simpler query -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Description: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) [spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [spark-core_2.12-3.1.1.jar:3.1.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {code} A workaround, that I used was to add .coalesce(1) before calling this method. It happens in a quite complex query and I have not been able to reproduce this with a simpler query Added an screenshot of the debugger, at the moment of exception !2021-05-03_18-34.png! was: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Description: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) [spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [spark-core_2.12-3.1.1.jar:3.1.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {code} A workaround, that I used was to add .coalesce(1) before calling this method. It happens in a quite complex query and I have not been able to reproduce this with a simpler query !2021-05-03_18-34.png! was: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1]
[jira] [Comment Edited] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338438#comment-17338438 ] Tanel Kiis edited comment on SPARK-35296 at 5/3/21, 3:58 PM: - I tried to change an excisting UT to reproduce this: {code} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index b3d29df1b2..0fcd31a09c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest } spark.listenerManager.register(listener) try { - val df = spark.range(100) + val df = spark.range(100).repartition(2) .observe( name = "my_event", min($"id").as("min_val"), {code} But did not hit the same exception: {code} [info] - get observable metrics by callback *** FAILED *** (324 milliseconds) [info] 0 did not equal 2 (DataFrameCallbackSuite.scala:261) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:261) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$15(DataFrameCallbackSuite.scala:270) {code} Although i think that this should not happen either. was (Author: tanelk): I tried to change an excisting UT to reproduce this: {code} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index b3d29df1b2..0fcd31a09c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest } spark.listenerManager.register(listener) try { - val df = spark.range(100) + val df = spark.range(100).repartition(2) .observe( name = "my_event", min($"id").as("min_val"), {code} But did not hit the same exception: {code} [info] - get observable metrics by callback *** FAILED *** (324 milliseconds) [info] 0 did not equal 2 (DataFrameCallbackSuite.scala:261) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:261) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$15(DataFrameCallbackSuite.scala:270) {code} > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at >
[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338441#comment-17338441 ] Tanel Kiis commented on SPARK-35296: [~hvanhovell] The assertion in AggregatingAccumulator is added by you. Perhaps you have some idea on why it happens. > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:147) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_282] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {code} > A workaround, that I used was to add .coalesce(1) before calling this method. > It happens in a quite complex query and I have not been able to reproduce > this with a simpler query -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338438#comment-17338438 ] Tanel Kiis commented on SPARK-35296: I tried to change an excisting UT to reproduce this: {code} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index b3d29df1b2..0fcd31a09c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest } spark.listenerManager.register(listener) try { - val df = spark.range(100) + val df = spark.range(100).repartition(2) .observe( name = "my_event", min($"id").as("min_val"), {code} But did not hit the same exception: {code} [info] - get observable metrics by callback *** FAILED *** (324 milliseconds) [info] 0 did not equal 2 (DataFrameCallbackSuite.scala:261) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:261) [info] at org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$15(DataFrameCallbackSuite.scala:270) {code} > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:147) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_282] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {code} > A workaround, that I used was to add .coalesce(1) before calling this method. > It
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Description: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) [spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [spark-core_2.12-3.1.1.jar:3.1.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {code} A workaround, that I used was to add .coalesce(1) before calling this method. It happens in a quite complex query and I have not been able to reproduce this with a simpler query was: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Description: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.10.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) [spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [spark-core_2.12-3.1.1.jar:3.1.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {code} A workouround, that I used wis to add .coalesce(1) before calling this method. was: I hit this assertion error when using dataset.observe: {code} {code} > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) > ~[spark-sql_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.10.jar:?] >
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Description: I hit this assertion error when using dataset.observe: {code} {code} was: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) [bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_162] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162] {code} > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > I hit this assertion error when using dataset.observe: > {code} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion
[ https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-35296: --- Description: I hit this assertion error when using dataset.observe: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) [bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_162] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162] {code} > Dataset.observe fails with an assertion > --- > > Key: SPARK-35296 > URL: https://issues.apache.org/jira/browse/SPARK-35296 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > I hit this assertion error when using dataset.observe: > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:208) > ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] > at > org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) > > ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT] > at > org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) > >
[jira] [Created] (SPARK-35296) Dataset.observe fails with an assertion
Tanel Kiis created SPARK-35296: -- Summary: Dataset.observe fails with an assertion Key: SPARK-35296 URL: https://issues.apache.org/jira/browse/SPARK-35296 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1 Reporter: Tanel Kiis -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34794) Nested higher-order functions broken in DSL
[ https://issues.apache.org/jira/browse/SPARK-34794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34794: --- Labels: correctness (was: Correctness) > Nested higher-order functions broken in DSL > --- > > Key: SPARK-34794 > URL: https://issues.apache.org/jira/browse/SPARK-34794 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.1.1 > Environment: 3.1.1 >Reporter: Daniel Solow >Priority: Major > Labels: correctness > > In Spark 3, if I have: > {code:java} > val df = Seq( > (Seq(1,2,3), Seq("a", "b", "c")) > ).toDF("numbers", "letters") > {code} > and I want to take the cross product of these two arrays, I can do the > following in SQL: > {code:java} > df.selectExpr(""" > FLATTEN( > TRANSFORM( > numbers, > number -> TRANSFORM( > letters, > letter -> (number AS number, letter AS letter) > ) > ) > ) AS zipped > """).show(false) > ++ > |zipped | > ++ > |[{1, a}, {1, b}, {1, c}, {2, a}, {2, b}, {2, c}, {3, a}, {3, b}, {3, c}]| > ++ > {code} > This works fine. But when I try the equivalent using the scala DSL, the > result is wrong: > {code:java} > df.select( > f.flatten( > f.transform( > $"numbers", > (number: Column) => { f.transform( > $"letters", > (letter: Column) => { f.struct( > number.as("number"), > letter.as("letter") > ) } > ) } > ) > ).as("zipped") > ).show(10, false) > ++ > |zipped | > ++ > |[{a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}]| > ++ > {code} > Note that the numbers are not included in the output. The explain for this > second version is: > {code:java} > == Parsed Logical Plan == > 'Project [flatten(transform('numbers, lambdafunction(transform('letters, > lambdafunction(struct(NamePlaceholder, lambda 'x AS number#442, > NamePlaceholder, lambda 'x AS letter#443), lambda 'x, false)), lambda 'x, > false))) AS zipped#444] > +- Project [_1#303 AS numbers#308, _2#304 AS letters#309] >+- LocalRelation [_1#303, _2#304] > == Analyzed Logical Plan == > zipped: array> > Project [flatten(transform(numbers#308, lambdafunction(transform(letters#309, > lambdafunction(struct(number, lambda x#446, letter, lambda x#446), lambda > x#446, false)), lambda x#445, false))) AS zipped#444] > +- Project [_1#303 AS numbers#308, _2#304 AS letters#309] >+- LocalRelation [_1#303, _2#304] > == Optimized Logical Plan == > LocalRelation [zipped#444] > == Physical Plan == > LocalTableScan [zipped#444] > {code} > Seems like variable name x is hardcoded. And sure enough: > https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3647 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34794) Nested higher-order functions broken in DSL
[ https://issues.apache.org/jira/browse/SPARK-34794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34794: --- Affects Version/s: 3.2.0 > Nested higher-order functions broken in DSL > --- > > Key: SPARK-34794 > URL: https://issues.apache.org/jira/browse/SPARK-34794 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.1.1 > Environment: 3.1.1 >Reporter: Daniel Solow >Priority: Major > > In Spark 3, if I have: > {code:java} > val df = Seq( > (Seq(1,2,3), Seq("a", "b", "c")) > ).toDF("numbers", "letters") > {code} > and I want to take the cross product of these two arrays, I can do the > following in SQL: > {code:java} > df.selectExpr(""" > FLATTEN( > TRANSFORM( > numbers, > number -> TRANSFORM( > letters, > letter -> (number AS number, letter AS letter) > ) > ) > ) AS zipped > """).show(false) > ++ > |zipped | > ++ > |[{1, a}, {1, b}, {1, c}, {2, a}, {2, b}, {2, c}, {3, a}, {3, b}, {3, c}]| > ++ > {code} > This works fine. But when I try the equivalent using the scala DSL, the > result is wrong: > {code:java} > df.select( > f.flatten( > f.transform( > $"numbers", > (number: Column) => { f.transform( > $"letters", > (letter: Column) => { f.struct( > number.as("number"), > letter.as("letter") > ) } > ) } > ) > ).as("zipped") > ).show(10, false) > ++ > |zipped | > ++ > |[{a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}]| > ++ > {code} > Note that the numbers are not included in the output. The explain for this > second version is: > {code:java} > == Parsed Logical Plan == > 'Project [flatten(transform('numbers, lambdafunction(transform('letters, > lambdafunction(struct(NamePlaceholder, lambda 'x AS number#442, > NamePlaceholder, lambda 'x AS letter#443), lambda 'x, false)), lambda 'x, > false))) AS zipped#444] > +- Project [_1#303 AS numbers#308, _2#304 AS letters#309] >+- LocalRelation [_1#303, _2#304] > == Analyzed Logical Plan == > zipped: array> > Project [flatten(transform(numbers#308, lambdafunction(transform(letters#309, > lambdafunction(struct(number, lambda x#446, letter, lambda x#446), lambda > x#446, false)), lambda x#445, false))) AS zipped#444] > +- Project [_1#303 AS numbers#308, _2#304 AS letters#309] >+- LocalRelation [_1#303, _2#304] > == Optimized Logical Plan == > LocalRelation [zipped#444] > == Physical Plan == > LocalTableScan [zipped#444] > {code} > Seems like variable name x is hardcoded. And sure enough: > https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3647 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34794) Nested higher-order functions broken in DSL
[ https://issues.apache.org/jira/browse/SPARK-34794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34794: --- Labels: Correctness (was: ) > Nested higher-order functions broken in DSL > --- > > Key: SPARK-34794 > URL: https://issues.apache.org/jira/browse/SPARK-34794 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.1.1 > Environment: 3.1.1 >Reporter: Daniel Solow >Priority: Major > Labels: Correctness > > In Spark 3, if I have: > {code:java} > val df = Seq( > (Seq(1,2,3), Seq("a", "b", "c")) > ).toDF("numbers", "letters") > {code} > and I want to take the cross product of these two arrays, I can do the > following in SQL: > {code:java} > df.selectExpr(""" > FLATTEN( > TRANSFORM( > numbers, > number -> TRANSFORM( > letters, > letter -> (number AS number, letter AS letter) > ) > ) > ) AS zipped > """).show(false) > ++ > |zipped | > ++ > |[{1, a}, {1, b}, {1, c}, {2, a}, {2, b}, {2, c}, {3, a}, {3, b}, {3, c}]| > ++ > {code} > This works fine. But when I try the equivalent using the scala DSL, the > result is wrong: > {code:java} > df.select( > f.flatten( > f.transform( > $"numbers", > (number: Column) => { f.transform( > $"letters", > (letter: Column) => { f.struct( > number.as("number"), > letter.as("letter") > ) } > ) } > ) > ).as("zipped") > ).show(10, false) > ++ > |zipped | > ++ > |[{a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}]| > ++ > {code} > Note that the numbers are not included in the output. The explain for this > second version is: > {code:java} > == Parsed Logical Plan == > 'Project [flatten(transform('numbers, lambdafunction(transform('letters, > lambdafunction(struct(NamePlaceholder, lambda 'x AS number#442, > NamePlaceholder, lambda 'x AS letter#443), lambda 'x, false)), lambda 'x, > false))) AS zipped#444] > +- Project [_1#303 AS numbers#308, _2#304 AS letters#309] >+- LocalRelation [_1#303, _2#304] > == Analyzed Logical Plan == > zipped: array> > Project [flatten(transform(numbers#308, lambdafunction(transform(letters#309, > lambdafunction(struct(number, lambda x#446, letter, lambda x#446), lambda > x#446, false)), lambda x#445, false))) AS zipped#444] > +- Project [_1#303 AS numbers#308, _2#304 AS letters#309] >+- LocalRelation [_1#303, _2#304] > == Optimized Logical Plan == > LocalRelation [zipped#444] > == Physical Plan == > LocalTableScan [zipped#444] > {code} > Seems like variable name x is hardcoded. And sure enough: > https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3647 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34922) Use better CBO cost function
Tanel Kiis created SPARK-34922: -- Summary: Use better CBO cost function Key: SPARK-34922 URL: https://issues.apache.org/jira/browse/SPARK-34922 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis In SPARK-33935 we changed the CBO cost function such that it would be symetric - A.betterThan(B) implies that !B.betterThan(A). Before both could have been true. That change introduced a performance regression in some queries. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34882) RewriteDistinctAggregates can cause a bug if the aggregator does not ignore NULLs
[ https://issues.apache.org/jira/browse/SPARK-34882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34882: --- Description: {code:title=group-by.sql} SELECT first(DISTINCT a), last(DISTINCT a), first(a), last(a), first(DISTINCT b), last(DISTINCT b), first(b), last(b) FROM testData WHERE a IS NOT NULL AND b IS NOT NULL;{code} {code:title=group-by.sql.out} -- !query schema struct -- !query output NULL1 1 3 1 NULL1 2 {code} The results should not be NULL, because NULL inputs are filtered out. was: {code:title=group-by.sql} SELECT first(DISTINCT a), last(DISTINCT a), first(DISTINCT b), last(DISTINCT b) FROM testData WHERE a IS NOT NULL AND b IS NOT NULL; {code} {code:title=group-by.sql.out} -- !query SELECT first(DISTINCT a), last(DISTINCT a), first(DISTINCT b), last(DISTINCT b) FROM testData WHERE a IS NOT NULL AND b IS NOT NULL -- !query schema struct -- !query output 1 3 NULLNULL {code} The results should not be NULL, because NULL inputs are filtered out. > RewriteDistinctAggregates can cause a bug if the aggregator does not ignore > NULLs > - > > Key: SPARK-34882 > URL: https://issues.apache.org/jira/browse/SPARK-34882 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > {code:title=group-by.sql} > SELECT > first(DISTINCT a), last(DISTINCT a), > first(a), last(a), > first(DISTINCT b), last(DISTINCT b), > first(b), last(b) > FROM testData WHERE a IS NOT NULL AND b IS NOT NULL;{code} > {code:title=group-by.sql.out} > -- !query schema > struct a):int,first(a):int,last(a):int,first(DISTINCT b):int,last(DISTINCT > b):int,first(b):int,last(b):int> > -- !query output > NULL 1 1 3 1 NULL1 2 > {code} > The results should not be NULL, because NULL inputs are filtered out. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34882) RewriteDistinctAggregates can cause a bug if the aggregator does not ignore NULLs
Tanel Kiis created SPARK-34882: -- Summary: RewriteDistinctAggregates can cause a bug if the aggregator does not ignore NULLs Key: SPARK-34882 URL: https://issues.apache.org/jira/browse/SPARK-34882 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis {code:title=group-by.sql} SELECT first(DISTINCT a), last(DISTINCT a), first(DISTINCT b), last(DISTINCT b) FROM testData WHERE a IS NOT NULL AND b IS NOT NULL; {code} {code:title=group-by.sql.out} -- !query SELECT first(DISTINCT a), last(DISTINCT a), first(DISTINCT b), last(DISTINCT b) FROM testData WHERE a IS NOT NULL AND b IS NOT NULL -- !query schema struct -- !query output 1 3 NULLNULL {code} The results should not be NULL, because NULL inputs are filtered out. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34876) Non-nullable aggregates can return NULL in a correlated subquery
[ https://issues.apache.org/jira/browse/SPARK-34876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34876: --- Affects Version/s: 2.4.7 3.0.2 3.1.1 > Non-nullable aggregates can return NULL in a correlated subquery > > > Key: SPARK-34876 > URL: https://issues.apache.org/jira/browse/SPARK-34876 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.7, 3.0.2, 3.2.0, 3.1.1 >Reporter: Tanel Kiis >Priority: Major > > Test case in scalar-subquery-select.sql: > {code:title=query} > SELECT t1a, > (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2, > (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2, > (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) > approx_count_distinct_t2, > (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2, > (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2, > (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = > t1a) collect_set_t2 > FROM t1; > {code} > {code:title=Result} > val1a 0 0 NULLNULLNULLNULL > val1a 0 0 NULLNULLNULLNULL > val1a 0 0 NULLNULLNULLNULL > val1a 0 0 NULLNULLNULLNULL > val1b 6 6 3 [19,119,319,19,19,19] [19,119,319] > 00010006000100045D8D6AB9000400010001 > val1c 2 2 2 [219,19][219,19] > 00010002000100045D8D6AB900010001 > val1d 0 0 NULLNULLNULLNULL > val1d 0 0 NULLNULLNULLNULL > val1d 0 0 NULLNULLNULLNULL > val1e 1 1 1 [19][19] > 00010001000100045D8D6AB90001 > val1e 1 1 1 [19][19] > 00010001000100045D8D6AB90001 > val1e 1 1 1 [19][19] > 00010001000100045D8D6AB90001 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34876) Non-nullable aggregates can return NULL in a correlated subquery
Tanel Kiis created SPARK-34876: -- Summary: Non-nullable aggregates can return NULL in a correlated subquery Key: SPARK-34876 URL: https://issues.apache.org/jira/browse/SPARK-34876 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis {code:title=query} SELECT t1a, (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2, (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2, (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) approx_count_distinct_t2, (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2, (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2, (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) collect_set_t2 FROM t1; {code} {code:title=Result} val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1b 6 6 3 [19,119,319,19,19,19] [19,119,319] 00010006000100045D8D6AB9000400010001 val1c 2 2 2 [219,19][219,19] 00010002000100045D8D6AB900010001 val1d 0 0 NULLNULLNULLNULL val1d 0 0 NULLNULLNULLNULL val1d 0 0 NULLNULLNULLNULL val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34876) Non-nullable aggregates can return NULL in a correlated subquery
[ https://issues.apache.org/jira/browse/SPARK-34876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34876: --- Description: Test case in scalar-subquery-select.sql: {code:title=query} SELECT t1a, (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2, (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2, (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) approx_count_distinct_t2, (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2, (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2, (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) collect_set_t2 FROM t1; {code} {code:title=Result} val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1b 6 6 3 [19,119,319,19,19,19] [19,119,319] 00010006000100045D8D6AB9000400010001 val1c 2 2 2 [219,19][219,19] 00010002000100045D8D6AB900010001 val1d 0 0 NULLNULLNULLNULL val1d 0 0 NULLNULLNULLNULL val1d 0 0 NULLNULLNULLNULL val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 {code} was: {code:title=query} SELECT t1a, (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2, (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2, (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) approx_count_distinct_t2, (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2, (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2, (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) collect_set_t2 FROM t1; {code} {code:title=Result} val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1a 0 0 NULLNULLNULLNULL val1b 6 6 3 [19,119,319,19,19,19] [19,119,319] 00010006000100045D8D6AB9000400010001 val1c 2 2 2 [219,19][219,19] 00010002000100045D8D6AB900010001 val1d 0 0 NULLNULLNULLNULL val1d 0 0 NULLNULLNULLNULL val1d 0 0 NULLNULLNULLNULL val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 val1e 1 1 1 [19][19] 00010001000100045D8D6AB90001 {code} > Non-nullable aggregates can return NULL in a correlated subquery > > > Key: SPARK-34876 > URL: https://issues.apache.org/jira/browse/SPARK-34876 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > Test case in scalar-subquery-select.sql: > {code:title=query} > SELECT t1a, > (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2, > (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2, > (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) > approx_count_distinct_t2, > (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2, > (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2, > (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = > t1a) collect_set_t2 > FROM t1; > {code} > {code:title=Result} > val1a 0 0 NULLNULLNULLNULL > val1a 0 0 NULLNULLNULLNULL > val1a 0 0 NULLNULL
[jira] [Created] (SPARK-34822) Update plan stability golden files even if only explain differs
Tanel Kiis created SPARK-34822: -- Summary: Update plan stability golden files even if only explain differs Key: SPARK-34822 URL: https://issues.apache.org/jira/browse/SPARK-34822 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis PlanStabilitySuite updates the golden files only if simplified.txt has changed. In some situations only explain.txt will change and the golden files are not updated. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34812) RowNumberLike and RankLike should not be nullable
Tanel Kiis created SPARK-34812: -- Summary: RowNumberLike and RankLike should not be nullable Key: SPARK-34812 URL: https://issues.apache.org/jira/browse/SPARK-34812 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34623) Deduplicate window expressions
[ https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296767#comment-17296767 ] Tanel Kiis commented on SPARK-34623: I had typo in the PR title, so it did not link up: https://github.com/apache/spark/pull/31740 > Deduplicate window expressions > -- > > Key: SPARK-34623 > URL: https://issues.apache.org/jira/browse/SPARK-34623 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > Remove duplicate window expressions from the Window node -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34565) Collapse Window nodes with Project between them
[ https://issues.apache.org/jira/browse/SPARK-34565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34565: --- Description: The CollapseWindow optimizer rule can be improved to also collapse Window nodes, that have a Project between them. This sort of Window - Project - Window chains will happen when chaining the dataframe.withColumn calls. (was: The CollapseWindow optimizer rule can be imroved to also collapse Window nodes, that have a Project between them. This sort of Window - Project - Window chains will happen when chaining the dataframe.withColumn calls.) > Collapse Window nodes with Project between them > --- > > Key: SPARK-34565 > URL: https://issues.apache.org/jira/browse/SPARK-34565 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > The CollapseWindow optimizer rule can be improved to also collapse Window > nodes, that have a Project between them. This sort of Window - Project - > Window chains will happen when chaining the dataframe.withColumn calls. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34623) Deduplicate window expressions
[ https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34623: --- Issue Type: Improvement (was: Bug) > Deduplicate window expressions > -- > > Key: SPARK-34623 > URL: https://issues.apache.org/jira/browse/SPARK-34623 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > Remove duplicate window expressions from the Window node -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34644) UDF returning array followed by explode calls the UDF multiple times and could return wrong results
[ https://issues.apache.org/jira/browse/SPARK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296556#comment-17296556 ] Tanel Kiis commented on SPARK-34644: UDF with internal state should be marked as non-deterministic with the `asNondeterministic()` method on it. If the issue persists after that, then it should be considered a bug. > UDF returning array followed by explode calls the UDF multiple times and > could return wrong results > --- > > Key: SPARK-34644 > URL: https://issues.apache.org/jira/browse/SPARK-34644 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Gavrilescu Laurentiu >Priority: Major > > *Applying an UDF followed by explode calls the UDF multiple times.* > Using *persist* after applying the UDF mitigates the problem > Consider the following code to reproduce it: > {code:java} > object Bug { > def main(args: Array[String]) { > val sparkSession: SparkSession = > SparkSession.builder.master("local[4]").getOrCreate() > val invocations = sparkSession.sparkContext.longAccumulator("invocations") > def showTiming[T](body: => T): T = { > val t0 = System.nanoTime() > invocations.reset() > val res = body > val t1 = System.nanoTime() > println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}") > res > } > def expensive(n: Int) = { > Thread.sleep(100) > invocations.add(1) > 1 > } > val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) }) > val df = sparkSession.range(10).toDF() > showTiming(df > .select(explode(expensiveUdf(col("id" > .select(sum(col("col"))) > .show()) > showTiming(df.select(expensiveUdf(col("id")).as("values")) > .persist() > .select(explode(col("values"))) > .select(sum("col")) > .show()) > } > } > {code} > => > {code:java} > first: invocations=300, time=11.342076635 > second: invocations=100, time=3.351682967{code} > This can have undesired behavior and can return wrong results if a managed > state is used inside the UDF. > Imagine having the following scenario: > 1. you have a dataframe with some string columns > 2. you have an expensive function that creates a score based on some string > input > 3. you want to get all the distinct values from all the columns and their > score - there is an executor level cache that holds the score values for > strings to minimize the execution of the expensive function > consider the following code to reproduce it: > {code:java} > case class RowWithStrings(c1: String, c2: String, c3: String) > case class ValueScore(value: String, score: Double) > object Bug { > val columns: List[String] = List("c1", "c2", "c3") > def score(input: String): Double = { > // insert expensive function here > input.toDouble > } > def main(args: Array[String]) { > lazy val sparkSession: SparkSession = { > val sparkSession = SparkSession.builder.master("local[4]") > .getOrCreate() > sparkSession > } > // some cache over expensive operation > val cache: TrieMap[String, Double] = TrieMap[String, Double]() > // get scores for all columns in the row > val body = (row: Row) => { > val arr = ArrayBuffer[ValueScore]() > columns foreach { > column => > val value = row.getAs[String](column) > if (!cache.contains(value)) { > val computedScore = score(value) > arr += ValueScore(value, computedScore) > cache(value) = computedScore > } > } > arr > } > val basicUdf = udf(body) > val values = (1 to 5) map { > idx => > // repeated values > RowWithStrings(idx.toString, idx.toString, idx.toString) > } > import sparkSession.implicits._ > val df = values.toDF("c1", "c2", "c3").persist() > val allCols = df.columns.map(col) > df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) > .select(explode(col("valuesScore"))) > .distinct() > .show() > } > } > {code} > this shows: > {code:java} > +---+ > |col| > +---+ > +---+ > {code} > When adding persist before explode, the result is correct: > {code:java} > df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) > .persist() > .select(explode(col("valuesScore"))) > .distinct() > .show() > {code} > => > {code:java} > ++ > | col| > ++ > |{2, 2.0}| > |{4, 4.0}| > |{3, 3.0}| > |{5, 5.0}| > |{1, 1.0}| > ++ > {code} > This is not reproducible using 3.0.2 version. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To
[jira] [Updated] (SPARK-34623) Deduplicate window expressions
[ https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-34623: --- Description: Remove duplicate window expressions from the Window node (was: Remove duplicate window expressions in the Window node) > Deduplicate window expressions > -- > > Key: SPARK-34623 > URL: https://issues.apache.org/jira/browse/SPARK-34623 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > Remove duplicate window expressions from the Window node -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34623) Deduplicate window expressions
Tanel Kiis created SPARK-34623: -- Summary: Deduplicate window expressions Key: SPARK-34623 URL: https://issues.apache.org/jira/browse/SPARK-34623 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis Remove duplicate window expressions in the Window node -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34565) Collapse Window nodes with Project between them
Tanel Kiis created SPARK-34565: -- Summary: Collapse Window nodes with Project between them Key: SPARK-34565 URL: https://issues.apache.org/jira/browse/SPARK-34565 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis The CollapseWindow optimizer rule can be imroved to also collapse Window nodes, that have a Project between them. This sort of Window - Project - Window chains will happen when chaining the dataframe.withColumn calls. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34141) ExtractGenerator analyzer should handle lazy projectlists
Tanel Kiis created SPARK-34141: -- Summary: ExtractGenerator analyzer should handle lazy projectlists Key: SPARK-34141 URL: https://issues.apache.org/jira/browse/SPARK-34141 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis With the dataframe api it is possible to have a lazy sequence as the output field on a LogicalPlan class. When exploding a column on this dataframe using the withColumn method, the ExtractGenerator does not extract the generator. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34014) Ignore Distinct if it is the right child of a left semi or anti join
[ https://issues.apache.org/jira/browse/SPARK-34014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis resolved SPARK-34014. Resolution: Won't Fix Can cause performance regression > Ignore Distinct if it is the right child of a left semi or anti join > > > Key: SPARK-34014 > URL: https://issues.apache.org/jira/browse/SPARK-34014 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > Left semi and anti joins ignore duplicates in the right child. Finding > distinct values there will only add overhead. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34014) Ignore Distinct if it is the right child of a left semi or anti join
Tanel Kiis created SPARK-34014: -- Summary: Ignore Distinct if it is the right child of a left semi or anti join Key: SPARK-34014 URL: https://issues.apache.org/jira/browse/SPARK-34014 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis Left semi and anti joins ignore duplicates in the right child. Finding distinct values there will only add overhead. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33971) Eliminate distinct from more aggregates
Tanel Kiis created SPARK-33971: -- Summary: Eliminate distinct from more aggregates Key: SPARK-33971 URL: https://issues.apache.org/jira/browse/SPARK-33971 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis The EliminateDistinct rule removes distinct from min and max aggregates. It could remove it from several more aggregates: * BitAndAgg * BitOrAgg * First * Last * HyperLogLogPlusPlus * CollectSet -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33964) Combine distinct unions in more cases
[ https://issues.apache.org/jira/browse/SPARK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-33964: --- Description: In several TPCDS queries the CombineUnions rule does not manage to combine unions, because they have noop Projects between them. The Projects will be removed by RemoveNoopOperators, but by then ReplaceDistinctWithAggregate has been applied and there are aggregates between the unions. > Combine distinct unions in more cases > - > > Key: SPARK-33964 > URL: https://issues.apache.org/jira/browse/SPARK-33964 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > In several TPCDS queries the CombineUnions rule does not manage to combine > unions, because they have noop Projects between them. > The Projects will be removed by RemoveNoopOperators, but by then > ReplaceDistinctWithAggregate has been applied and there are aggregates > between the unions. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33964) Combine distinct unions in more cases
Tanel Kiis created SPARK-33964: -- Summary: Combine distinct unions in more cases Key: SPARK-33964 URL: https://issues.apache.org/jira/browse/SPARK-33964 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33935) Fix CBOs cost function
[ https://issues.apache.org/jira/browse/SPARK-33935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-33935: --- Issue Type: Bug (was: Improvement) > Fix CBOs cost function > --- > > Key: SPARK-33935 > URL: https://issues.apache.org/jira/browse/SPARK-33935 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Major > > The parameter spark.sql.cbo.joinReorder.card.weight is decumented as: > {code:title=spark.sql.cbo.joinReorder.card.weight} > The weight of cardinality (number of rows) for plan cost comparison in join > reorder: rows * weight + size * (1 - weight). > {code} > But in the implementation the formula is a bit different: > {code:title=Current implementation} > def betterThan(other: JoinPlan, conf: SQLConf): Boolean = { > if (other.planCost.card == 0 || other.planCost.size == 0) { > false > } else { > val relativeRows = BigDecimal(this.planCost.card) / > BigDecimal(other.planCost.card) > val relativeSize = BigDecimal(this.planCost.size) / > BigDecimal(other.planCost.size) > relativeRows * conf.joinReorderCardWeight + > relativeSize * (1 - conf.joinReorderCardWeight) < 1 > } > } > {code} > This change has an unfortunate consequence: > given two plans A and B, both A betterThan B and B betterThan A might give > the same results. This happes when one has many rows with small sizes and > other has few rows with large sizes. > A example values, that have this fenomen with the default weight value (0.7): > A.card = 500, B.card = 300 > A.size = 30, B.size = 80 > Both A betterThan B and B betterThan A would have score above 1 and would > return false. > A new implementation is proposed, that matches the documentation: > {code:title=Proposed implementation} > def betterThan(other: JoinPlan, conf: SQLConf): Boolean = { > val oldCost = BigDecimal(this.planCost.card) * > conf.joinReorderCardWeight + > BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight) > val newCost = BigDecimal(other.planCost.card) * > conf.joinReorderCardWeight + > BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight) > newCost < oldCost > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33935) Fix CBOs cost function
Tanel Kiis created SPARK-33935: -- Summary: Fix CBOs cost function Key: SPARK-33935 URL: https://issues.apache.org/jira/browse/SPARK-33935 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis The parameter spark.sql.cbo.joinReorder.card.weight is decumented as: {code:title=spark.sql.cbo.joinReorder.card.weight} The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight). {code} But in the implementation the formula is a bit different: {code:title=Current implementation} def betterThan(other: JoinPlan, conf: SQLConf): Boolean = { if (other.planCost.card == 0 || other.planCost.size == 0) { false } else { val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card) val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size) relativeRows * conf.joinReorderCardWeight + relativeSize * (1 - conf.joinReorderCardWeight) < 1 } } {code} This change has an unfortunate consequence: given two plans A and B, both A betterThan B and B betterThan A might give the same results. This happes when one has many rows with small sizes and other has few rows with large sizes. A example values, that have this fenomen with the default weight value (0.7): A.card = 500, B.card = 300 A.size = 30, B.size = 80 Both A betterThan B and B betterThan A would have score above 1 and would return false. A new implementation is proposed, that matches the documentation: {code:title=Proposed implementation} def betterThan(other: JoinPlan, conf: SQLConf): Boolean = { val oldCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight + BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight) val newCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight + BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight) newCost < oldCost } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33070) Optimizer rules for HigherOrderFunctions
[ https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-33070: --- Summary: Optimizer rules for HigherOrderFunctions (was: Optimizer rules for collection datatypes and SimpleHigherOrderFunction) > Optimizer rules for HigherOrderFunctions > > > Key: SPARK-33070 > URL: https://issues.apache.org/jira/browse/SPARK-33070 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Minor > > SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be > combined and reordered to achieve more optimal plan. > Possible rules: > * Combine 2 consecutive array transforms > * Combine 2 consecutive array filters > * Push array filter through array sort > * Remove array sort before array exists and array forall. > * Combine 2 consecutive map filters -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33070) Optimizer rules for collection datatypes and SimpleHigherOrderFunction
[ https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-33070: --- Affects Version/s: (was: 3.1.0) 3.2.0 > Optimizer rules for collection datatypes and SimpleHigherOrderFunction > -- > > Key: SPARK-33070 > URL: https://issues.apache.org/jira/browse/SPARK-33070 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Tanel Kiis >Priority: Minor > > SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be > combined and reordered to achieve more optimal plan. > Possible rules: > * Combine 2 consecutive array transforms > * Combine 2 consecutive array filters > * Push array filter through array sort > * Remove array sort before array exists and array forall. > * Combine 2 consecutive map filters -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33851) Push partial aggregates bellow exchanges
Tanel Kiis created SPARK-33851: -- Summary: Push partial aggregates bellow exchanges Key: SPARK-33851 URL: https://issues.apache.org/jira/browse/SPARK-33851 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Tanel Kiis The `EnsureRequirements` rule inserts exchanges between partial and final aggregates to reduce the exchanged data amount. However, if the user manually partitions the data correctly, then partial aggregates will be after the exchange. If possible we should push partial aggregates bellow the manually inserted exchanges to reduce the exchanged data amount. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32110) -0.0 vs 0.0 is inconsistent
[ https://issues.apache.org/jira/browse/SPARK-32110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247357#comment-17247357 ] Tanel Kiis commented on SPARK-32110: [~cloud_fan] fixed issue I mentioned in the first comment in this PR: https://github.com/apache/spark/pull/29647 > -0.0 vs 0.0 is inconsistent > --- > > Key: SPARK-32110 > URL: https://issues.apache.org/jira/browse/SPARK-32110 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Robert Joseph Evans >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.2, 3.1.0 > > > This is related to SPARK-26021 where some things were fixed but there is > still a lot that is not consistent. > When parsing SQL {{-0.0}} is turned into {{0.0}}. This can produce quick > results that appear to be correct but are totally inconsistent for the same > operators. > {code:java} > scala> import spark.implicits._ > import spark.implicits._ > scala> spark.sql("SELECT 0.0 = -0.0").collect > res0: Array[org.apache.spark.sql.Row] = Array([true]) > scala> Seq((0.0, -0.0)).toDF("a", "b").selectExpr("a = b").collect > res1: Array[org.apache.spark.sql.Row] = Array([false]) > {code} > This also shows up in sorts > {code:java} > scala> Seq((0.0, -100.0), (-0.0, 100.0), (0.0, 100.0), (-0.0, > -100.0)).toDF("a", "b").orderBy("a", "b").collect > res2: Array[org.apache.spark.sql.Row] = Array([-0.0,-100.0], [-0.0,100.0], > [0.0,-100.0], [0.0,100.0]) > {code} > But not for a equi-join or for an aggregate > {code:java} > scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", > "r_b"), $"a" === $"r_a").collect > res3: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0]) > scala> Seq((0.0, 1.0), (-0.0, 1.0)).toDF("a", "b").groupBy("a").count.collect > res6: Array[org.apache.spark.sql.Row] = Array([0.0,2]) > {code} > This can lead to some very odd results. Like an equi-join with a filter that > logically should do nothing, but ends up filtering the result to nothing. > {code:java} > scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", > "r_b"), $"a" === $"r_a" && $"a" <= $"r_a").collect > res8: Array[org.apache.spark.sql.Row] = Array() > scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", > "r_b"), $"a" === $"r_a").collect > res9: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0]) > {code} > Hive never normalizes -0.0 to 0.0 so this results in non-ieee complaint > behavior everywhere, but at least it is consistently odd. > MySQL, Oracle, Postgres, and SQLite all appear to normalize the {{-0.0}} to > {{0.0}}. > The root cause of this appears to be that the java implementation of > {{Double.compare}} and {{Float.compare}} for open JDK places {{-0.0}} < > {{0.0}}. > This is not documented in the java docs but it is clearly documented in the > code, so it is not a "bug" that java is going to fix. > [https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L1022-L1035] > It is also consistent with what is in the java docs for {{Double.equals}} > > [https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#equals-java.lang.Object-] > To be clear I am filing this mostly to document the current state rather than > to think it needs to be fixed ASAP. It is a rare corner case, but ended up > being really frustrating for me to debug what was happening. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33225) Extract AliasHelper trait
Tanel Kiis created SPARK-33225: -- Summary: Extract AliasHelper trait Key: SPARK-33225 URL: https://issues.apache.org/jira/browse/SPARK-33225 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Tanel Kiis During SPARK-33122 we saw that there are several alias related methods duplicated between optimizers and analyzers. Do keep that PR more concise, lets extract AliasHelper in a separate PR. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33177) CollectList and CollectSet should not be nullable
Tanel Kiis created SPARK-33177: -- Summary: CollectList and CollectSet should not be nullable Key: SPARK-33177 URL: https://issues.apache.org/jira/browse/SPARK-33177 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Tanel Kiis CollectList and CollectSet SQL expressions never return null value. Marking them as non-nullable can have some performance benefits, because some optimizer rules apply only to non-nullable expressions -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33122) Remove redundant aggregates in the Optimzier
Tanel Kiis created SPARK-33122: -- Summary: Remove redundant aggregates in the Optimzier Key: SPARK-33122 URL: https://issues.apache.org/jira/browse/SPARK-33122 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.1 Reporter: Tanel Kiis It is possible to have two or more consecutive aggregates whose sole purpose is to keep only distinct values (for example TPCDS q87). We can remove all but the last one do improve performance. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33070) Optimizer rules for collection datatypes and SimpleHigherOrderFunction
[ https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-33070: --- Summary: Optimizer rules for collection datatypes and SimpleHigherOrderFunction (was: Optimizer rules for SimpleHigherOrderFunction) > Optimizer rules for collection datatypes and SimpleHigherOrderFunction > -- > > Key: SPARK-33070 > URL: https://issues.apache.org/jira/browse/SPARK-33070 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Tanel Kiis >Priority: Minor > > SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be > combined and reordered to achieve more optimal plan. > Possible rules: > * Combine 2 consecutive array transforms > * Combine 2 consecutive array filters > * Push array filter through array sort > * Remove array sort before array exists and array forall. > * Combine 2 consecutive map filters -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33070) Optimizer rules for SimpleHigherOrderFunction
Tanel Kiis created SPARK-33070: -- Summary: Optimizer rules for SimpleHigherOrderFunction Key: SPARK-33070 URL: https://issues.apache.org/jira/browse/SPARK-33070 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Tanel Kiis SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be combined and reordered to achieve more optimal plan. Possible rules: * Combine 2 consecutive array transforms * Combine 2 consecutive array filters * Push array filter through array sort * Remove array sort before array exists and array forall. * Combine 2 consecutive map filters -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33070) Optimizer rules for SimpleHigherOrderFunction
[ https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-33070: --- Priority: Minor (was: Major) > Optimizer rules for SimpleHigherOrderFunction > - > > Key: SPARK-33070 > URL: https://issues.apache.org/jira/browse/SPARK-33070 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Tanel Kiis >Priority: Minor > > SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be > combined and reordered to achieve more optimal plan. > Possible rules: > * Combine 2 consecutive array transforms > * Combine 2 consecutive array filters > * Push array filter through array sort > * Remove array sort before array exists and array forall. > * Combine 2 consecutive map filters -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32995) CostBasedJoinReorder optimizer rule should be idempotent
Tanel Kiis created SPARK-32995: -- Summary: CostBasedJoinReorder optimizer rule should be idempotent Key: SPARK-32995 URL: https://issues.apache.org/jira/browse/SPARK-32995 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Tanel Kiis The CostBasedJoinReorder has currently the following comment: {code} // Since join costs in AQP can change between multiple runs, there is no reason that we have an // idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once. {code} This is incorrect reasoning. In the optimizer, we mean that a batch is idempotent, when applying it for the second time in a row (without changing anything) it does not change the plan at all. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32970) Reduce the runtime of unit test for SPARK-32019
Tanel Kiis created SPARK-32970: -- Summary: Reduce the runtime of unit test for SPARK-32019 Key: SPARK-32970 URL: https://issues.apache.org/jira/browse/SPARK-32970 Project: Spark Issue Type: Improvement Components: SQL, Tests Affects Versions: 3.1.0 Reporter: Tanel Kiis The UT for SPARK-32019 can run over 7 minutes on jenkins. This sort of simple UT should run in few seconds - definitely less than a minute. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32928) Non-deterministic expressions should not be reordered inside AND and OR
[ https://issues.apache.org/jira/browse/SPARK-32928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-32928: --- Labels: correctness (was: CorrectnessBug) > Non-deterministic expressions should not be reordered inside AND and OR > --- > > Key: SPARK-32928 > URL: https://issues.apache.org/jira/browse/SPARK-32928 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > Using the splitDisjunctivePredicates and splitConjunctivePredicates helper > methods can change the number of times a non-deterministic expression is > executed. This can cause correctness issues on the client side. > An existing test in the FilterPushdownSuite seems to exhibit this problem > {code} > test("generate: non-deterministic predicate referenced no generated column") { > val originalQuery = { > testRelationWithArrayType > .generate(Explode('c_arr), alias = Some("arr")) > .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('col > 6)) > } > val optimized = Optimize.execute(originalQuery.analyze) > val correctAnswer = { > testRelationWithArrayType > .where('b >= 5) > .generate(Explode('c_arr), alias = Some("arr")) > .where('a + Rand(10).as("rnd") > 6 && 'col > 6) > .analyze > } > comparePlans(optimized, correctAnswer) > } > {code} > In the optimized plan, the deterministic filter is moved ahead of the > non-deterministic one: > {code} > Filter ((6 < none#0) AND (cast(6 as double) < (rand(10) + cast(none#0 as > double > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32928) Non-deterministic expressions should not be reordered inside AND and OR
[ https://issues.apache.org/jira/browse/SPARK-32928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanel Kiis updated SPARK-32928: --- Labels: CorrectnessBug (was: ) > Non-deterministic expressions should not be reordered inside AND and OR > --- > > Key: SPARK-32928 > URL: https://issues.apache.org/jira/browse/SPARK-32928 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 >Reporter: Tanel Kiis >Priority: Major > Labels: CorrectnessBug > > Using the splitDisjunctivePredicates and splitConjunctivePredicates helper > methods can change the number of times a non-deterministic expression is > executed. This can cause correctness issues on the client side. > An existing test in the FilterPushdownSuite seems to exhibit this problem > {code} > test("generate: non-deterministic predicate referenced no generated column") { > val originalQuery = { > testRelationWithArrayType > .generate(Explode('c_arr), alias = Some("arr")) > .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('col > 6)) > } > val optimized = Optimize.execute(originalQuery.analyze) > val correctAnswer = { > testRelationWithArrayType > .where('b >= 5) > .generate(Explode('c_arr), alias = Some("arr")) > .where('a + Rand(10).as("rnd") > 6 && 'col > 6) > .analyze > } > comparePlans(optimized, correctAnswer) > } > {code} > In the optimized plan, the deterministic filter is moved ahead of the > non-deterministic one: > {code} > Filter ((6 < none#0) AND (cast(6 as double) < (rand(10) + cast(none#0 as > double > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32928) Non-deterministic expressions should not be reordered inside AND and OR
[ https://issues.apache.org/jira/browse/SPARK-32928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199646#comment-17199646 ] Tanel Kiis commented on SPARK-32928: One more point, where this can manifest is FilterExec reordering isNotNull predicates {code:title=Test SQL file} -- Test window operator with codegen on and off. --CONFIG_DIM1 spark.sql.codegen.wholeStage=true --CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY --CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 AS testData(a); SELECT a FROM testData WHERE NOT ISNULL(IF(RAND(0) > 0.5, NULL, a)) AND RAND(1) > 0.5; {code} {code:title=Generated output file} - Automatically generated by SQLQueryTestSuite -- Number of queries: 2 -- !query CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 AS testData(a) -- !query schema struct<> -- !query output -- !query SELECT a FROM testData WHERE NOT ISNULL(IF(RAND(0) > 0.5, NULL, a)) AND RAND(1) > 0.5 -- !query schema struct -- !query output 3 4 8 {code} {code:title=Error on running the test} 23:16:44.013 ERROR org.apache.spark.sql.SQLQueryTestSuite: Error using configs: spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY [info] - deterministic.sql *** FAILED *** (1 second, 955 milliseconds) [info] deterministic.sql [info] Expected "3 [info] 4 [info] 8[]", but got "3 [info] 4 [info] 8[ [info] 9]" Result did not match for query #1 {code} > Non-deterministic expressions should not be reordered inside AND and OR > --- > > Key: SPARK-32928 > URL: https://issues.apache.org/jira/browse/SPARK-32928 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 >Reporter: Tanel Kiis >Priority: Major > > Using the splitDisjunctivePredicates and splitConjunctivePredicates helper > methods can change the number of times a non-deterministic expression is > executed. This can cause correctness issues on the client side. > An existing test in the FilterPushdownSuite seems to exhibit this problem > {code} > test("generate: non-deterministic predicate referenced no generated column") { > val originalQuery = { > testRelationWithArrayType > .generate(Explode('c_arr), alias = Some("arr")) > .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('col > 6)) > } > val optimized = Optimize.execute(originalQuery.analyze) > val correctAnswer = { > testRelationWithArrayType > .where('b >= 5) > .generate(Explode('c_arr), alias = Some("arr")) > .where('a + Rand(10).as("rnd") > 6 && 'col > 6) > .analyze > } > comparePlans(optimized, correctAnswer) > } > {code} > In the optimized plan, the deterministic filter is moved ahead of the > non-deterministic one: > {code} > Filter ((6 < none#0) AND (cast(6 as double) < (rand(10) + cast(none#0 as > double > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org