To whom it may concern, we reported the following Sedona behaviour and would like to ask your opinion on how we can otpimize it.
Our aim is to perform a inner spatial join between a points_df and a polygon_df when a point in points_df is contained in a polygon from polygons_df. Below you can find more details about the 2 dataframes we are considering: - points_df: it contains 50mln events with latitude and longitude approximated to the third decimal digit; - polygon_df: it contains 10k multi-polygons having 600 vertexes on average. The issue we are reporting is a very long computing time and the spatial join query never completing even when running on cluster with 40 workers with 4 cores each. No error is being print by driver but we are receiving the following warning: WARN org.apache.sedona.core.spatialOperator.JoinQuery: UseIndex is true, but no index exists. Will build index on the fly. Actually we were able to run successfully the same spatial join when only considering a very small sample of events. Do you have any suggestion on how we can archive the same result on higher volumes of data or if there is a way we can optimize the join? Attached you can find the pseudo-code we are running. Looking forward to hearing from you. Kind regards, Pietro Greselin
def toSpatialPoints(df, lat_column_name, lon_column_name): df.createOrReplaceTempView("points_df") return ( spark.sql("SELECT *, ST_Point({}, {}) AS point FROM points_df".format(lon_column_name, lat_column_name)) .drop(lat_column_name, lon_column_name) ) def toSpatialPolygons(df, wtk_column_name): df.createOrReplaceTempView("polygons_df") return ( spark.sql("SELECT *, ST_GeomFromWKT({}) AS polygon FROM polygons_df".format(wtk_column_name)) .drop(wtk_column_name) ) def sJoin(polygons_df, points_df): polygons_df.createOrReplaceTempView('polygons_df') points_df.createOrReplaceTempView('points_df') return spark.sql("SELECT * FROM polygons_df, points_df WHERE ST_Contains(polygons_df.polygon, points_df.point)") maps = spark.read.parquet(maps_path).select('AREA_ID', 'WKT') polygons_df = toSpatialPolygons(maps, 'WKT') events = spark.read.parquet(events_path).select('ID', 'LATITUDE', 'LONGITUDE') points_df = toSpatialPoints(events, 'LATITUDE', 'LONGITUDE') spatial_join = sJoin(df_polygons, df_points)