To whom it may concern,
we reported the following Sedona behaviour and would like to ask your
opinion on how we can otpimize it.
Our aim is to perform a inner spatial join between a points_df and a
polygon_df when a point in points_df is contained in a polygon from
polygons_df.
Below you can find more details about the 2 dataframes we are considering:
- points_df: it contains 50mln events with latitude and longitude
approximated to the third decimal digit;
- polygon_df: it contains 10k multi-polygons having 600 vertexes on average.
The issue we are reporting is a very long computing time and the spatial
join query never completing even when running on cluster with 40 workers
with 4 cores each.
No error is being print by driver but we are receiving the following
warning:
WARN org.apache.sedona.core.spatialOperator.JoinQuery: UseIndex is true,
but no index exists. Will build index on the fly.
Actually we were able to run successfully the same spatial join when only
considering a very small sample of events.
Do you have any suggestion on how we can archive the same result on higher
volumes of data or if there is a way we can optimize the join?
Attached you can find the pseudo-code we are running.
Looking forward to hearing from you.
Kind regards,
Pietro Greselin
def toSpatialPoints(df, lat_column_name, lon_column_name):
df.createOrReplaceTempView("points_df")
return (
spark.sql("SELECT *, ST_Point({}, {}) AS point FROM
points_df".format(lon_column_name, lat_column_name))
.drop(lat_column_name, lon_column_name)
)
def toSpatialPolygons(df, wtk_column_name):
df.createOrReplaceTempView("polygons_df")
return (
spark.sql("SELECT *, ST_GeomFromWKT({}) AS polygon FROM
polygons_df".format(wtk_column_name))
.drop(wtk_column_name)
)
def sJoin(polygons_df, points_df):
polygons_df.createOrReplaceTempView('polygons_df')
points_df.createOrReplaceTempView('points_df')
return spark.sql("SELECT * FROM polygons_df, points_df WHERE
ST_Contains(polygons_df.polygon, points_df.point)")
maps = spark.read.parquet(maps_path).select('AREA_ID', 'WKT')
polygons_df = toSpatialPolygons(maps, 'WKT')
events = spark.read.parquet(events_path).select('ID', 'LATITUDE', 'LONGITUDE')
points_df = toSpatialPoints(events, 'LATITUDE', 'LONGITUDE')
spatial_join = sJoin(df_polygons, df_points)