Kristin Cowalcijk created SEDONA-624: ----------------------------------------
Summary: Distance join throws java.lang.reflect.InvocationTargetException when working with AQE Key: SEDONA-624 URL: https://issues.apache.org/jira/browse/SEDONA-624 Project: Apache Sedona Issue Type: Bug Affects Versions: 1.6.0 Reporter: Kristin Cowalcijk Fix For: 1.6.1 This is the minimal code for reproducing this problem: {code:scala} spark.conf.set("sedona.join.autoBroadcastJoinThreshold", "-1") val kDistanceDf = spark.range(0, 10).withColumn("x", rand()).withColumn("y", rand()).withColumn("geometry", expr("st_point(x, y)")).withColumn("k_distance", rand()) val formattedDataframe = spark.range(0, 10).withColumn("x", rand()).withColumn("y", rand()).withColumn("geometry", expr("st_point(x, y)")) val neighborsDf = kDistanceDf.alias("l").join(formattedDataframe.alias("r"), ST_Distance(col("l.geometry"), col("r.geometry")) <= col("l.k_distance")) neighborsDf.selectExpr("COUNT(*)").explain() neighborsDf.selectExpr("COUNT(*)").show() {code} The {{.show()}} call throws an exception: {code} java.lang.reflect.InvocationTargetException was thrown. java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$6(TreeNode.scala:762) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:761) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:718) at org.apache.spark.sql.execution.SparkPlan.super$makeCopy(SparkPlan.scala:102) at org.apache.spark.sql.execution.SparkPlan.$anonfun$makeCopy$1(SparkPlan.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102) at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:64) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:231) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:583) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:581) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:581) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:581) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:581) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:581) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:581) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:508) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:561) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:561) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:261) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:256) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:401) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332) at org.apache.spark.sql.Dataset.head(Dataset.scala:3326) at org.apache.spark.sql.Dataset.take(Dataset.scala:3549) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280) at org.apache.spark.sql.Dataset.showString(Dataset.scala:315) at org.apache.spark.sql.Dataset.show(Dataset.scala:839) at org.apache.spark.sql.Dataset.show(Dataset.scala:798) at org.apache.spark.sql.Dataset.show(Dataset.scala:807) {code} The query plan printed by {{.explain()}} is {code} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=47] +- HashAggregate(keys=[], functions=[partial_count(1)]) +- Project +- DistanceJoin geometry#20: geometry, geometry#40: geometry, Inner, k_distance#25: double, true, INTERSECTS, false, ( **org.apache.spark.sql.sedona_sql.expressions.ST_Distance** <= k_distance#25), ( **org.apache.spark.sql.sedona_sql.expressions.ST_Distance** <= k_distance#25) :- Project [ **org.apache.spark.sql.sedona_sql.expressions.ST_Point** AS geometry#20, rand(1938374972112629094) AS k_distance#25] : +- Project [x#13, rand(2628366806535957819) AS y#16] : +- Project [rand(2558384822432534253) AS x#13] : +- Range (0, 10, step=1, splits=10) +- Project [ **org.apache.spark.sql.sedona_sql.expressions.ST_Point** AS geometry#40] +- Project [x#33, rand(5835597603278804781) AS y#36] +- Project [rand(-5242910633616757400) AS x#33] +- Range (0, 10, step=1, splits=10) {code} If we disable AQE, the problem cannot be reproduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)