Kristin Cowalcijk created SEDONA-624:
----------------------------------------

             Summary: Distance join throws 
java.lang.reflect.InvocationTargetException when working with AQE
                 Key: SEDONA-624
                 URL: https://issues.apache.org/jira/browse/SEDONA-624
             Project: Apache Sedona
          Issue Type: Bug
    Affects Versions: 1.6.0
            Reporter: Kristin Cowalcijk
             Fix For: 1.6.1


This is the minimal code for reproducing this problem:

{code:scala}
spark.conf.set("sedona.join.autoBroadcastJoinThreshold", "-1")
val kDistanceDf = spark.range(0, 10).withColumn("x", rand()).withColumn("y", 
rand()).withColumn("geometry", expr("st_point(x, y)")).withColumn("k_distance", 
rand())
val formattedDataframe = spark.range(0, 10).withColumn("x", 
rand()).withColumn("y", rand()).withColumn("geometry", expr("st_point(x, y)"))
val neighborsDf = kDistanceDf.alias("l").join(formattedDataframe.alias("r"), 
ST_Distance(col("l.geometry"), col("r.geometry")) <= col("l.k_distance"))
neighborsDf.selectExpr("COUNT(*)").explain()
neighborsDf.selectExpr("COUNT(*)").show()
{code}

The {{.show()}} call throws an exception:

{code}
java.lang.reflect.InvocationTargetException was thrown.
java.lang.reflect.InvocationTargetException
        at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
        at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$6(TreeNode.scala:762)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:761)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:718)
        at 
org.apache.spark.sql.execution.SparkPlan.super$makeCopy(SparkPlan.scala:102)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$makeCopy$1(SparkPlan.scala:102)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:64)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:231)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:583)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:581)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:581)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:581)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:581)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:581)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:581)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:567)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:566)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:508)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:561)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:561)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:261)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:256)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:401)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:839)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:798)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:807)
{code}

The query plan printed by {{.explain()}} is

{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=47]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Project
            +- DistanceJoin geometry#20: geometry, geometry#40: geometry, 
Inner, k_distance#25: double, true, INTERSECTS, false, ( 
**org.apache.spark.sql.sedona_sql.expressions.ST_Distance**   <= 
k_distance#25), ( **org.apache.spark.sql.sedona_sql.expressions.ST_Distance**   
<= k_distance#25)
               :- Project [ 
**org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS geometry#20, 
rand(1938374972112629094) AS k_distance#25]
               :  +- Project [x#13, rand(2628366806535957819) AS y#16]
               :     +- Project [rand(2558384822432534253) AS x#13]
               :        +- Range (0, 10, step=1, splits=10)
               +- Project [ 
**org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS geometry#40]
                  +- Project [x#33, rand(5835597603278804781) AS y#36]
                     +- Project [rand(-5242910633616757400) AS x#33]
                        +- Range (0, 10, step=1, splits=10)
{code}

If we disable AQE, the problem cannot be reproduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to