Hi. If 1.4.0 is slower than 1.3.1-incubating when running this join, it might be the behavior change introduced to 1.4.0 that caused the performance degradation. If the join is already slow in 1.3.1-incubating and the situation has not improved in 1.4.0, then we can see if we can tune some knobs to make it run efficiently. I cannot see the images attached to your original email, so I may miss out on important information provided in the original email. If this is the case please let me know.
*Situation 1: 1.4.0 is slower than 1.3.1-incubating.* There is a behavior change introduced to 1.4.0: when we refer to the "left" or "right" side of the relation in sedona configuration such as sedona.join.spartitionside and sedona.join.indexbuildside, we'll always stick to the semantics of SQL. You can refer to SEDONA-260 <https://issues.apache.org/jira/browse/SEDONA-260> for a detailed explanation of this change. If your query was affected by this change, you can switch to the old behavior using the following configurations: sparkSession.conf.set("sedona.join.spartitionside", "right") sparkSession.conf.set("sedona.join.indexbuildside", "right") *Situation 2: The join is already slow in 1.3.1-incubating.* The workload is quite unbalanced according to your description (3000 tasks among 4102 finished very quickly while the final 100 tasks take over 1 hour each to complete). The spatial partitioning we have might not lead to a balanced workload for the join stage. There are many possible reasons. 1. The extent of joined datasets varies too much. For example, if geom is from a continent-scale dataset while start_lon/start_lat is from a city-scale dataset, then partitioning geom evenly won't produce a balanced workload for the join stage. Partitioning the start_lon dataset in this case would yield better performance. We can configure the spatial partitioned relation using sparkSession.conf.set("sedona.join.spartitionside", "right" /* or left, depending on the current partition side */) 2. The dataset may also be hard to partition. If we partition the start_lon side, the result of evaluating ST_collect(ST_Point(start_lon, start_lat), ST_Point(end_lon, end_lat)) will be partitioned. If start locations are far away from end locations, we'll partition a set of geometries containing lots of large and overlapping boxes, which is hard to yield good join performance. In this particular case we can perform 2 joins separately (intersect with start location, intersect with end location), then union the results together. Hope it could yield better performance than joining on multipoints directly. 3. You've mentioned that disabling global index improves job stability. It may be caused by poor spatial partition for the index build side, or we have to increase the number of partitions to make the index fit into executor memory. I've also found that using r-tree for indexing helps a lot in some cases, you can try it out by specifying spark.conf.set("sedona.global.indextype", "rtree"). The join task for each partition will run as a nested loop without enabling the global index, which could be painfully slow. It is better to leave it enabled for most cases. Regards Kristin On Thu, Mar 23, 2023 at 8:22 AM Trang Nguyen <[email protected]> wrote: > Hi, > > > I’ve upgraded from Sedona 1.3.1-incubating to 1.4.0 but am still seeing a > significant slowdown in spatial joins as task processing increases. > > > > For instance, for a spatial partition count of 4102, the first 3000 > proceeds under 3 sec but then gets progressively worse for the same amount > of shuffle read records. > > The final 100 tasks take over 1 hour each to complete. > > > > I’ve tried to disable the global index which seemed to reduce failures but > am still seeing the performance degradation: > > > > // sedona indexing properties > sparkSession.*conf*.set("sedona.join.gridtype", "kdbtree") > sparkSession.*conf*.set("sedona.global.index", "false") > sparkSession.*conf*.set("sedona.join.indexbuildside", "left") > > if (appConf.getNrInputPartitions > 0) { > sparkSession.*conf*.set("spark.sql.shuffle.partitions", > appConf.getNrInputPartitions.toString) > sparkSession.*conf*.set("sedona.join.numpartition", > (appConf.getNrInputPartitions).toString) > > sparkSession.*conf*.set("spark.default.parallelism", > appConf.getNrInputPartitions.toString) > *LOG*.info(s"Set spark.default.parallelism, > spark.sql.shuffle.partitions to: *$*{appConf.getNrInputPartitions}") > } > > > > > > > > I am using a RangeJoin with the dataframe API: > > st_intersects(geom, ST_collect(ST_Point(start_lon, start_lat), > ST_Point(end_lon, end_lat))) > > > > > > > > > > Is this a bug or are there steps or settings I could use to get to more > stable performance? > > > > Thanks > Trang >
