Hi. The spatial indexes built when running spatial join using Spark SQL API are transient, and there's no way to reuse them. We have to use the RDD API to reuse spatial partitioned and indexed RDDs. Please refer to https://sedona.apache.org/latest-snapshot/tutorial/rdd/#write-a-spatial-join-query for details.
Regards, Kristin On Mon, Mar 27, 2023 at 12:18 PM Trang Nguyen <[email protected]> wrote: > The union worked like a charm, thanks again. > Another question: since I would like to use the spatial indexes for other > DAG phases, how can I cache them using the dataframe API? > > Trang > -----Original Message----- > From: Trang Nguyen <[email protected]> > Sent: Wednesday, March 22, 2023 8:59 PM > To: [email protected] > Subject: RE: Performance degradation for spatial join in Spark during task > processing > > Use Good Judgement: This email originated outside of INRIX Do not click on > links or open attachments unless you recognize the sender and know the > content is safe. > > Hi Kristin, > > Thanks so much for the quick response and excellent explanation. > I’ve observed the same behavior in 1.3.1 as well, so it is very likely due > to data skew as you pointed out. > I will try out your suggestions to see if this improves behavior. > > Best, > Trang > > Sent from Mail< > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgo.microsoft.com%2Ffwlink%2F%3FLinkId%3D550986&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C10470df2f1c7403ffe1608db2b53087b%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638151407882606872%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=hZow4kKcKooZNmEhe2XiWrkHihDu3UO1L8XcQb5xWcU%3D&reserved=0> > for Windows > > From: Kristin Cowalcijk<mailto:[email protected]> > Sent: Wednesday, March 22, 2023 8:12 PM > To: [email protected]<mailto:[email protected]> > Subject: Re: Performance degradation for spatial join in Spark during task > processing > > Use Good Judgement: This email originated outside of INRIX Do not click on > links or open attachments unless you recognize the sender and know the > content is safe. > > Hi. > > If 1.4.0 is slower than 1.3.1-incubating when running this join, it might > be the behavior change introduced to 1.4.0 that caused the performance > degradation. If the join is already slow in 1.3.1-incubating and the > situation has not improved in 1.4.0, then we can see if we can tune some > knobs to make it run efficiently. I cannot see the images attached to your > original email, so I may miss out on important information provided in the > original email. If this is the case please let me know. > > *Situation 1: 1.4.0 is slower than 1.3.1-incubating.* There is a behavior > change introduced to 1.4.0: when we refer to the "left" > or "right" side of the relation in sedona configuration such as > sedona.join.spartitionside and sedona.join.indexbuildside, we'll always > stick to the semantics of SQL. You can refer to SEDONA-260 < > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSEDONA-260&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C10470df2f1c7403ffe1608db2b53087b%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638151407882762662%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=mTpcOVzUB5S8McjNDWxCYB5bZcrBK1oHWyvnMk9MF20%3D&reserved=0> > for a detailed explanation of this change. If your query was affected by > this change, you can switch to the old behavior using the following > configurations: > > sparkSession.conf.set("sedona.join.spartitionside", "right") > sparkSession.conf.set("sedona.join.indexbuildside", "right") > > *Situation 2: The join is already slow in 1.3.1-incubating.* The workload > is quite unbalanced according to your description (3000 tasks among 4102 > finished very quickly while the final 100 tasks take over 1 hour each to > complete). The spatial partitioning we have might not lead to a balanced > workload for the join stage. There are many possible reasons. > > 1. The extent of joined datasets varies too much. For example, if geom is > from a continent-scale dataset while start_lon/start_lat is from a > city-scale dataset, then partitioning geom evenly won't produce a balanced > workload for the join stage. Partitioning the start_lon dataset in this > case would yield better performance. We can configure the spatial > partitioned relation using > sparkSession.conf.set("sedona.join.spartitionside", "right" /* or left, > depending on the current partition side */) 2. The dataset may also be hard > to partition. If we partition the start_lon side, the result of evaluating > ST_collect(ST_Point(start_lon, start_lat), ST_Point(end_lon, end_lat)) will > be partitioned. If start locations are far away from end locations, we'll > partition a set of geometries containing lots of large and overlapping > boxes, which is hard to yield good join performance. In this particular > case we can perform 2 joins separately (intersect with start location, > intersect with end location), then union the results together. Hope it > could yield better performance than joining on multipoints directly. > 3. You've mentioned that disabling global index improves job stability. It > may be caused by poor spatial partition for the index build side, or we > have to increase the number of partitions to make the index fit into > executor memory. I've also found that using r-tree for indexing helps a lot > in some cases, you can try it out by specifying > spark.conf.set("sedona.global.indextype", "rtree"). The join task for each > partition will run as a nested loop without enabling the global index, > which could be painfully slow. It is better to leave it enabled for most > cases. > > Regards > Kristin > > On Thu, Mar 23, 2023 at 8:22 AM Trang Nguyen <[email protected]> > wrote: > > > Hi, > > > > > > I’ve upgraded from Sedona 1.3.1-incubating to 1.4.0 but am still > > seeing a significant slowdown in spatial joins as task processing > increases. > > > > > > > > For instance, for a spatial partition count of 4102, the first 3000 > > proceeds under 3 sec but then gets progressively worse for the same > > amount of shuffle read records. > > > > The final 100 tasks take over 1 hour each to complete. > > > > > > > > I’ve tried to disable the global index which seemed to reduce failures > > but am still seeing the performance degradation: > > > > > > > > // sedona indexing properties > > sparkSession.*conf*.set("sedona.join.gridtype", "kdbtree") > > sparkSession.*conf*.set("sedona.global.index", "false") > > sparkSession.*conf*.set("sedona.join.indexbuildside", "left") > > > > if (appConf.getNrInputPartitions > 0) { > > sparkSession.*conf*.set("spark.sql.shuffle.partitions", > > appConf.getNrInputPartitions.toString) > > sparkSession.*conf*.set("sedona.join.numpartition", > > (appConf.getNrInputPartitions).toString) > > > > sparkSession.*conf*.set("spark.default.parallelism", > > appConf.getNrInputPartitions.toString) > > *LOG*.info(s"Set spark.default.parallelism, > > spark.sql.shuffle.partitions to: *$*{appConf.getNrInputPartitions}") > > } > > > > > > > > > > > > > > > > I am using a RangeJoin with the dataframe API: > > > > st_intersects(geom, ST_collect(ST_Point(start_lon, start_lat), > > ST_Point(end_lon, end_lat))) > > > > > > > > > > > > > > > > > > > > Is this a bug or are there steps or settings I could use to get to > > more stable performance? > > > > > > > > Thanks > > Trang > > > >
