To whom it may concern,

we reported the following Sedona behaviour and would like to ask your
opinion on how we can otpimize it.

Our aim is to perform a inner spatial join between a points_df and a
polygon_df when a point in points_df is contained in a polygon from
polygons_df.
Below you can find more details about the 2 dataframes we are considering:
- points_df: it contains 50mln events with latitude and longitude
approximated to the third decimal digit;
- polygon_df: it contains 10k multi-polygons having 600 vertexes on average.

The issue we are reporting is a very long computing time and the spatial
join query never completing even when running on cluster with 40 workers
with 4 cores each.
No error is being print by driver but we are receiving the following
warning:
WARN org.apache.sedona.core.spatialOperator.JoinQuery: UseIndex is true,
but no index exists. Will build index on the fly.

Actually we were able to run successfully the same spatial join when only
considering a very small sample of events.
Do you have any suggestion on how we can archive the same result on higher
volumes of data or if there is a way we can optimize the join?

Attached you can find the pseudo-code we are running.

Looking forward to hearing from you.

Kind regards,
Pietro Greselin
def toSpatialPoints(df, lat_column_name, lon_column_name):
    df.createOrReplaceTempView("points_df")
    return (
        spark.sql("SELECT *, ST_Point({}, {}) AS point FROM 
points_df".format(lon_column_name, lat_column_name))
        .drop(lat_column_name, lon_column_name)
    )

def toSpatialPolygons(df, wtk_column_name):
    df.createOrReplaceTempView("polygons_df")
    return (
        spark.sql("SELECT *, ST_GeomFromWKT({}) AS polygon FROM 
polygons_df".format(wtk_column_name))
        .drop(wtk_column_name)
    )

def sJoin(polygons_df, points_df):
    polygons_df.createOrReplaceTempView('polygons_df')
    points_df.createOrReplaceTempView('points_df')        
    return spark.sql("SELECT * FROM polygons_df, points_df WHERE 
ST_Contains(polygons_df.polygon, points_df.point)")


maps = spark.read.parquet(maps_path).select('AREA_ID', 'WKT')
polygons_df = toSpatialPolygons(maps, 'WKT')

events = spark.read.parquet(events_path).select('ID', 'LATITUDE', 'LONGITUDE')
points_df = toSpatialPoints(events, 'LATITUDE', 'LONGITUDE')

spatial_join = sJoin(df_polygons, df_points)

Reply via email to