The union worked like a charm, thanks again.
Another question: since I would like to use the spatial indexes for other DAG 
phases, how can I cache them using the dataframe API?

Trang
-----Original Message-----
From: Trang Nguyen <[email protected]> 
Sent: Wednesday, March 22, 2023 8:59 PM
To: [email protected]
Subject: RE: Performance degradation for spatial join in Spark during task 
processing

Use Good Judgement: This email originated outside of INRIX Do not click on 
links or open attachments unless you recognize the sender and know the content 
is safe.

Hi Kristin,

Thanks so much for the quick response and excellent explanation.
I’ve observed the same behavior in 1.3.1 as well, so it is very likely due to 
data skew as you pointed out.
I will try out your suggestions to see if this improves behavior.

Best,
Trang

Sent from 
Mail<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgo.microsoft.com%2Ffwlink%2F%3FLinkId%3D550986&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C10470df2f1c7403ffe1608db2b53087b%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638151407882606872%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=hZow4kKcKooZNmEhe2XiWrkHihDu3UO1L8XcQb5xWcU%3D&reserved=0>
 for Windows

From: Kristin Cowalcijk<mailto:[email protected]>
Sent: Wednesday, March 22, 2023 8:12 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Performance degradation for spatial join in Spark during task 
processing

Use Good Judgement: This email originated outside of INRIX Do not click on 
links or open attachments unless you recognize the sender and know the content 
is safe.

Hi.

If 1.4.0 is slower than 1.3.1-incubating when running this join, it might be 
the behavior change introduced to 1.4.0 that caused the performance 
degradation. If the join is already slow in 1.3.1-incubating and the situation 
has not improved in 1.4.0, then we can see if we can tune some knobs to make it 
run efficiently. I cannot see the images attached to your original email, so I 
may miss out on important information provided in the original email. If this 
is the case please let me know.

*Situation 1: 1.4.0 is slower than 1.3.1-incubating.* There is a behavior 
change introduced to 1.4.0: when we refer to the "left"
or "right" side of the relation in sedona configuration such as 
sedona.join.spartitionside and sedona.join.indexbuildside, we'll always stick 
to the semantics of SQL. You can refer to SEDONA-260 
<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSEDONA-260&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C10470df2f1c7403ffe1608db2b53087b%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638151407882762662%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=mTpcOVzUB5S8McjNDWxCYB5bZcrBK1oHWyvnMk9MF20%3D&reserved=0>
 for a detailed explanation of this change. If your query was affected by this 
change, you can switch to the old behavior using the following configurations:

sparkSession.conf.set("sedona.join.spartitionside", "right") 
sparkSession.conf.set("sedona.join.indexbuildside", "right")

*Situation 2: The join is already slow in 1.3.1-incubating.* The workload is 
quite unbalanced according to your description (3000 tasks among 4102 finished 
very quickly while the final 100 tasks take over 1 hour each to complete). The 
spatial partitioning we have might not lead to a balanced workload for the join 
stage. There are many possible reasons.

1. The extent of joined datasets varies too much. For example, if geom is from 
a continent-scale dataset while start_lon/start_lat is from a city-scale 
dataset, then partitioning geom evenly won't produce a balanced workload for 
the join stage. Partitioning the start_lon dataset in this case would yield 
better performance. We can configure the spatial partitioned relation using 
sparkSession.conf.set("sedona.join.spartitionside", "right" /* or left, 
depending on the current partition side */) 2. The dataset may also be hard to 
partition. If we partition the start_lon side, the result of evaluating 
ST_collect(ST_Point(start_lon, start_lat), ST_Point(end_lon, end_lat)) will be 
partitioned. If start locations are far away from end locations, we'll 
partition a set of geometries containing lots of large and overlapping boxes, 
which is hard to yield good join performance. In this particular case we can 
perform 2 joins separately (intersect with start location, intersect with end 
location), then union the results together. Hope it could yield better 
performance than joining on multipoints directly.
3. You've mentioned that disabling global index improves job stability. It may 
be caused by poor spatial partition for the index build side, or we have to 
increase the number of partitions to make the index fit into executor memory. 
I've also found that using r-tree for indexing helps a lot in some cases, you 
can try it out by specifying spark.conf.set("sedona.global.indextype", 
"rtree"). The join task for each partition will run as a nested loop without 
enabling the global index, which could be painfully slow. It is better to leave 
it enabled for most cases.

Regards
Kristin

On Thu, Mar 23, 2023 at 8:22 AM Trang Nguyen <[email protected]> wrote:

> Hi,
>
>
> I’ve upgraded from Sedona 1.3.1-incubating to 1.4.0 but am still 
> seeing a significant slowdown in spatial joins as task processing increases.
>
>
>
> For instance, for a spatial partition count of 4102, the first 3000 
> proceeds under 3 sec but then gets progressively worse for the same 
> amount of shuffle read records.
>
> The final 100 tasks take over 1 hour each to complete.
>
>
>
> I’ve tried to disable the global index which seemed to reduce failures 
> but am still seeing the performance degradation:
>
>
>
> // sedona indexing properties
> sparkSession.*conf*.set("sedona.join.gridtype", "kdbtree") 
> sparkSession.*conf*.set("sedona.global.index", "false") 
> sparkSession.*conf*.set("sedona.join.indexbuildside", "left")
>
> if (appConf.getNrInputPartitions > 0) {
>   sparkSession.*conf*.set("spark.sql.shuffle.partitions",
> appConf.getNrInputPartitions.toString)
>   sparkSession.*conf*.set("sedona.join.numpartition",
> (appConf.getNrInputPartitions).toString)
>
>   sparkSession.*conf*.set("spark.default.parallelism",
> appConf.getNrInputPartitions.toString)
>   *LOG*.info(s"Set spark.default.parallelism, 
> spark.sql.shuffle.partitions to: *$*{appConf.getNrInputPartitions}")
> }
>
>
>
>
>
>
>
> I am using a RangeJoin with the dataframe API:
>
> st_intersects(geom, ST_collect(ST_Point(start_lon, start_lat), 
> ST_Point(end_lon, end_lat)))
>
>
>
>
>
>
>
>
>
> Is this a bug or are there steps or settings I could use to get to 
> more stable performance?
>
>
>
> Thanks
> Trang
>

Reply via email to