Doug Dennis created SEDONA-231:
----------------------------------
Summary: Redundant Serde Removal
Key: SEDONA-231
URL: https://issues.apache.org/jira/browse/SEDONA-231
Project: Apache Sedona
Issue Type: Improvement
Reporter: Doug Dennis
Currently, Geometry objects are deserialized and reserialized during every
evaluation of a function on a row in Spark. This amounts to a great deal of
redundant serde during query execution. At times, objects are serialized just
to be immediately deserialized.
To demonstrate this in action, I placed print statements in the
GeometrySerializer serialize and deserialize methods, the GeometryUDT serialize
and deserialize methods, and in the eval methods of several functions. When the
following is executed:
{noformat}
val columns = Seq("input", "blade")
val data = Seq(
("GEOMETRYCOLLECTION ( LINESTRING (0 0, 1.5 1.5, 2 2), LINESTRING (3 3, 4.5
4.5, 5 5))", "MULTIPOINT (0.5 0.5, 1 1, 3.5 3.5, 4 4)")
)
var df = spark.createDataFrame(data).toDF(columns:_*)
println(
df.selectExpr("ST_Normalize(ST_Split(ST_GeomFromWKT(input),
ST_GeomFromWKT(blade))) AS
result").collect()(0).get(0).asInstanceOf[Geometry].toText()
){noformat}
I get the following output:
{noformat}
**org.apache.spark.sql.sedona_sql.expressions.ST_Normalize**
**org.apache.spark.sql.sedona_sql.expressions.ST_Split**
**org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
Inside GeometrySerializer.serialize
Inside GeometrySerializer.serialize
Inside GeometrySerializer.serialize
Inside GeometrySerializer.deserialize
**org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
Inside GeometrySerializer.serialize
Inside GeometrySerializer.deserialize
Inside GeometrySerializer.serialize
Inside GeometrySerializer.deserialize
Inside GeometrySerializer.serialize
Inside UDT deserialize.
Inside GeometrySerializer.deserialize
MULTILINESTRING ((0 0, 0.5 0.5), (0.5 0.5, 1 1), (1 1, 1.5 1.5, 2 2), (3 3, 3.5
3.5), (3.5 3.5, 4 4), (4 4, 4.5 4.5, 5 5)){noformat}
To explain what is happening:
# ST_Normalize.eval is called.
# ST_Normalize.eval calls ST_Split.eval.
# ST_Split.eval first calls the ST_GeomFromWKT that had the GEOMETRYCOLLECTION
wkt.
# ST_GeomFromWKT processes the wkt string and generates a Geometry object.
# The Geometry object is passed to GeometrySerializer.serialize. This is the
first call to serialize.
# This object is a GeometryCollection and the GeometrySerializer uses
recursion to handle them so you see two more additional calls to serialize.
# The GeometryCollection is then immediately deserialized and returned to
ST_Split.
# The second ST_GeomFromWKT is called (this one has a MULTIPOINT wkt).
# ST_GeomFromWKT processes the WKT and then serializes the geometry.
# That geometry is immediately deserialized and returned to ST_Split.
# ST_Split performs its operation and then serializes the geometry.
# That geometry is then immediately deserialized and returned to ST_Normalize.
# ST_Normalize normalizes the geometry object and then serializes it for good.
# Then the GeometryUDT.deserialize is called to handle the collect call which
of course calls GeometrySerializer.deserialize.
There are multiple instances here where geometry objects are serialized and
then immediately deserialized to be further operated on. That is obviously
pretty wasteful.
I propose eliminating this redundancy through the following steps.
* Create a trait called SerdeAware which has a single method called
doNotSerializeOutput.
* This trait is then added to the InferredUnaryExpression and
InferredBinaryExpression abstract classes.
* When the doNotSerializeOutput is called on one of the expression classes, a
serializeOutput flag is set to false.
* That flag is read in the class's eval method.
* If the flag is false then the output will not be serialized and if the flag
is true then the output does get serialized.
* Finally, the buildExtractor method of the InferredTypes object is modified
to detect if the input expression is SerdeAware and if it is then the
doNotSerializeOutput method is called before calling the input expression's
eval method.
In the test implementation I created I was able to get the following output:
{noformat}
**org.apache.spark.sql.sedona_sql.expressions.ST_Normalize**
**org.apache.spark.sql.sedona_sql.expressions.ST_Split**
**org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
**org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
Inside GeometrySerializer.serialize
Inside UDT deserialize.
Inside GeometrySerializer.deserialize
MULTILINESTRING ((0 0, 0.5 0.5), (0.5 0.5, 1 1), (1 1, 1.5 1.5, 2 2), (3 3, 3.5
3.5), (3.5 3.5, 4 4), (4 4, 4.5 4.5, 5 5)){noformat}
You can see that only a single serialization was called and only at the very
end of the computation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)