Hi.

If 1.4.0 is slower than 1.3.1-incubating when running this join, it might
be the behavior change introduced to 1.4.0 that caused the performance
degradation. If the join is already slow in 1.3.1-incubating and the
situation has not improved in 1.4.0, then we can see if we can tune some
knobs to make it run efficiently. I cannot see the images attached to your
original email, so I may miss out on important information provided in the
original email. If this is the case please let me know.

*Situation 1: 1.4.0 is slower than 1.3.1-incubating.*
There is a behavior change introduced to 1.4.0: when we refer to the "left"
or "right" side of the relation in sedona configuration such as
sedona.join.spartitionside and sedona.join.indexbuildside, we'll always
stick to the semantics of SQL. You can refer to SEDONA-260
<https://issues.apache.org/jira/browse/SEDONA-260> for a detailed
explanation of this change. If your query was affected by this change, you
can switch to the old behavior using the following configurations:

sparkSession.conf.set("sedona.join.spartitionside", "right")
sparkSession.conf.set("sedona.join.indexbuildside", "right")

*Situation 2: The join is already slow in 1.3.1-incubating.*
The workload is quite unbalanced according to your description (3000 tasks
among 4102 finished very quickly while the final 100 tasks take over 1 hour
each to complete). The spatial partitioning we have might not lead to a
balanced workload for the join stage. There are many possible reasons.

1. The extent of joined datasets varies too much. For example, if geom is
from a continent-scale dataset while start_lon/start_lat is from a
city-scale dataset, then partitioning geom evenly won't produce a balanced
workload for the join stage. Partitioning the start_lon dataset in this
case would yield better performance. We can configure the spatial
partitioned relation using
sparkSession.conf.set("sedona.join.spartitionside", "right" /* or left,
depending on the current partition side */)
2. The dataset may also be hard to partition. If we partition the start_lon
side, the result of evaluating ST_collect(ST_Point(start_lon, start_lat),
ST_Point(end_lon, end_lat)) will be partitioned. If start locations are far
away from end locations, we'll partition a set of geometries containing
lots of large and overlapping boxes, which is hard to yield good join
performance. In this particular case we can perform 2 joins separately
(intersect with start location, intersect with end location), then union
the results together. Hope it could yield better performance than joining
on multipoints directly.
3. You've mentioned that disabling global index improves job stability. It
may be caused by poor spatial partition for the index build side, or we
have to increase the number of partitions to make the index fit into
executor memory. I've also found that using r-tree for indexing helps a lot
in some cases, you can try it out by specifying
spark.conf.set("sedona.global.indextype", "rtree"). The join task for each
partition will run as a nested loop without enabling the global index,
which could be painfully slow. It is better to leave it enabled for
most cases.

Regards
Kristin

On Thu, Mar 23, 2023 at 8:22 AM Trang Nguyen <[email protected]> wrote:

> Hi,
>
>
> I’ve upgraded from Sedona 1.3.1-incubating to 1.4.0 but am still seeing a
> significant slowdown in spatial joins as task processing increases.
>
>
>
> For instance, for a spatial partition count of 4102, the first 3000
> proceeds under 3 sec but then gets progressively worse for the same amount
> of shuffle read records.
>
> The final 100 tasks take over 1 hour each to complete.
>
>
>
> I’ve tried to disable the global index which seemed to reduce failures but
> am still seeing the performance degradation:
>
>
>
> // sedona indexing properties
> sparkSession.*conf*.set("sedona.join.gridtype", "kdbtree")
> sparkSession.*conf*.set("sedona.global.index", "false")
> sparkSession.*conf*.set("sedona.join.indexbuildside", "left")
>
> if (appConf.getNrInputPartitions > 0) {
>   sparkSession.*conf*.set("spark.sql.shuffle.partitions",
> appConf.getNrInputPartitions.toString)
>   sparkSession.*conf*.set("sedona.join.numpartition",
> (appConf.getNrInputPartitions).toString)
>
>   sparkSession.*conf*.set("spark.default.parallelism",
> appConf.getNrInputPartitions.toString)
>   *LOG*.info(s"Set spark.default.parallelism,
> spark.sql.shuffle.partitions to: *$*{appConf.getNrInputPartitions}")
> }
>
>
>
>
>
>
>
> I am using a RangeJoin with the dataframe API:
>
> st_intersects(geom, ST_collect(ST_Point(start_lon, start_lat),
> ST_Point(end_lon, end_lat)))
>
>
>
>
>
>
>
>
>
> Is this a bug or are there steps or settings I could use to get to more
> stable performance?
>
>
>
> Thanks
> Trang
>

Reply via email to