[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753471#comment-17753471 ] Huw commented on SPARK-37487: - I think I've seen crashes because of this in production. I can't reproduce locally, but I believe that Imperative aggregates are having their `serialiseAggregateBufferInPlace` function called twice, the second time it's doing an unsafe coerce onto a byte buffer. {quote}Caused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.aggregate.ApproxQuantiles.serialize(ApproxQuantiles.scala:19) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33){quote} > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17451530#comment-17451530 ] Apache Spark commented on SPARK-37487: -- User 'sarutak' has created a pull request for this issue: https://github.com/apache/spark/pull/34765 > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17451482#comment-17451482 ] Hyukjin Kwon commented on SPARK-37487: -- cc [~beliefer] FYI > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17451376#comment-17451376 ] Kousuke Saruta commented on SPARK-37487: [~tanelk] Thank you for pinging me. I think a sampling job for the global sort performs the extra CollectMetrics (operations before the sort are performed twice). Please let me look into more. > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450452#comment-17450452 ] Tanel Kiis commented on SPARK-37487: [~cloud_fan] and [~sarutak], you helped with the last CollectMetrics bug. Perhaps you have some idea, why this is happening. > CollectMetrics is executed twice if it is followed by an sort > - > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-X: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-X: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org