Kristin Cowalcijk created SEDONA-186:
----------------------------------------

             Summary: `collect` result of a spatial join query with SELECT * 
fails with serde error
                 Key: SEDONA-186
                 URL: https://issues.apache.org/jira/browse/SEDONA-186
             Project: Apache Sedona
          Issue Type: Bug
    Affects Versions: 1.2.1
            Reporter: Kristin Cowalcijk


h2.  The Problem

The SQL query reproduced this bug was:
{code:scala}
scala> spark.sql("SELECT * FROM osm_all_nodes_geom osm INNER JOIN 
ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect

22/11/01 18:41:08 WARN JoinQuery: UseIndex is true, but no index exists. Will 
build index on the fly.
java.lang.RuntimeException: Error while decoding: 
org.locationtech.jts.io.ParseException: Unknown WKB type 184
createexternalrow(input[0, bigint, true], input[1, binary, true], 
newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize, 
input[3, string, true].toString, input[4, string, true].toString, input[5, 
binary, true], newInstance(class 
org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize, 
StructField(id,LongType,true), StructField(wkb,BinaryType,true), 
StructField(geom,GeometryUDT,true), StructField(location,StringType,true), 
StructField(quad_key,StringType,true), StructField(wkb,BinaryType,true), 
StructField(geom,GeometryUDT,true))
  at 
org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:172)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
  ... 47 elided
Caused by: org.locationtech.jts.io.ParseException: Unknown WKB type 184
  at org.locationtech.jts.io.WKBReader.readGeometry(WKBReader.java:282)
  at org.locationtech.jts.io.WKBReader.read(WKBReader.java:191)
  at org.locationtech.jts.io.WKBReader.read(WKBReader.java:159)
  at 
org.apache.sedona.sql.utils.GeometrySerializer$.deserialize(GeometrySerializer.scala:49)
  at 
org.apache.spark.sql.sedona_sql.UDT.GeometryUDT.deserialize(GeometryUDT.scala:42)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:181)
  ... 65 more
{code}

Where {{osm_all_nodes_geom}} is a DataFrame with 2 columns:

{code:scala}
scala> spark.sql("SELECT * FROM osm_all_nodes_geom").printSchema
root
 |-- id: long (nullable = true)
 |-- wkb: binary (nullable = true)
 |-- geom: geometry (nullable = true)
{code}

{{ms_buildings_geom}} is a DataFrame with 3 columns:

{code:scala}
scala> spark.sql("SELECT * FROM ms_buildings_geom").printSchema
root
 |-- location: string (nullable = true)
 |-- quad_key: string (nullable = true)
 |-- wkb: binary (nullable = true)
 |-- geom: geometry (nullable = true)
{code}

h2. Other findings

# This problem only reproduces when running spatial join queries with {{SELECT 
*}}. If we SELECT some particular columns, the problem went away.
# {{.show}} runs perfectly fine, while {{collect}} or {{takeAsList}} runs into 
this problem.
# This problem cannot reproduced by running broadcast join, such as
{code:scala}
spark.sql("SELECT /*+ BROADCAST(msb) */ * FROM osm_all_nodes_geom osm INNER 
JOIN ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect
{code}

h2. Cause of this problem

When the join query optimizer replaces a join node with {{RangeJoinExec}} or 
{{DistanceJoinExec}} node, left and right sub plans of the join may get 
swapped. The output of {{RangeJoinExec}} or {{DistanceJoinExec}} became 
{{rightOutput ++ leftOutput}} instead of {{leftOutput ++ rightOutput}}. This 
leads to column type inconsistency when deserializing the result rows of the 
join query.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to