Hello Sedona community,

I was benchmarking the following geospatial range join query and similar using 
Sedona, based on serverless spark clusters provisioned via Amazon Glue 
Notebooks. I have been using Sedona version v1.5.0. Pyspark dataframes 't' and 
'g' are both loaded from parquet files hosted on Amazon S3, and are partitioned 
in nested folder structures, based on region, year, month and date. 
Specifically, 'g' is a polygon layer with millions of rows representing 
polygons across Canada,  and 't' is a point layer that has about 200 columns, 
and up to tens of billions of rows depending on the region/year/month/date 
chosen, representing points across Canada and the U.S.

result = sedona.sql(
     """
         select t.device_uid, g.grid_id
             from t, g
             where st_contains(ST_GeomFromWKT(g.geometry), 
ST_GeomFromWKT(t.geometry))
     """
)


I was able to increase spark cluster size in AWS Glue to handle larger volumes 
of the 't' dataset. For instance, as a stress test, I was able to complete a 
query with 't' amounting to 14 billion rows in Canada in a bit over 3 hours, 
with a spark cluster of 2TB memory. I am in general impressed with the 
performance, and my questions below are regarding potential venues to improve 
the performance a bit further if possible, and also help me understand deeper 
about Sedona performance tuning.  I am attaching the result of running 
'explain' on my query. Any comments and suggestions are greatly appreciated! 
And I apologize for the long message.

1) Geoparquet-level predicate pushdown

Based on https://sedona.apache.org/1.5.0/api/sql/Optimizer/, my understanding 
is that geoparquet level predicate pushdown happens automatically. I have 
experimented with roughly equal volumes of 't' in Canada and U.S. respectively. 
When 't' is in the order of hundreds of millions of objects in the U.S., the 
above query would complete in 1/6 of the time taken by same volume of 't' data 
in Canada, with an identical size and configuration of spark cluster. That 
seems to confirm geoparquet-level predicate pushdown is at play and causes 
'polygons in Canada containing points in U.S.' to return right away.

When I increased the volume of 't' data 10x to the order of billions of point 
objects, the U.S. case is still faster than the Canada case, but only 
marginally, based on a spark cluster 8x larger in both cases. So for the 
seemingly diminishing benefit of geoparquet-level predicate pushdown, I wonder 
if it is hitting any limitation of Sedona's predicate pushdown mechanism, or it 
is potentially due to limitations associated with the way spark clusters are 
being scaled up in AWS Glue.

In general, how can one confirm based on spark logs etc. that geoparquet-level 
predicate pushdown is active? Can it be turned on and off? And if so can one 
see statistics on volumes of data and files scanned?

As a related note and question, in 
https://sedona.apache.org/1.5.0/api/sql/Optimizer/ under "Push spatial 
predicates to GeoParquet", it states that "To maximize the performance of 
Sedona GeoParquet filter pushdown, we suggest that you sort the data by their 
geohash values and then save as a GeoParquet file". In my case, the large 
points layer 't' does have a geohash6 column, and I took the following steps to 
see if that would help

a)  select * from 't' order by t.geohash6
b)  save the result to S3 as geoparquet files, following the same partitioning 
scheme
c)  run 'g' contains 't' query against the newly ordered 't' dataset with a 
spark cluster of the same size and configuration

Based on my results, that does help the performance a little bit, but it is 
very marginal to the point it is negligible. I am a little confused as to 
whether geoparquet-level predicate pushdown is active or not.

2) The use of pyspark dataframe function repartition(n, column)

I have experimented with the repartition function against the dataset 't', 
i.e., the layer consists of a large number of points. From my experience, 
increasing the count n will help performance up until a certain point, beyond 
which having more partitions could actually lead to slower queries, and may 
also cause the spark cluster to fail with out of memory error.

If I understand correctly, finding the right partition count is a matter of 
experimentation. Is there something in the spark logs that could be an 
indication that the number of partitions is too low or too high?

And a second sub-question regarding repartition is that, should one repartition 
all dataframes involved in a query to improve performance, or just the biggest 
dataframe? In my case, since the point layer 't' is much greater than the 
polygon layer 'g' in terms of both object count and data volume in GBs on S3, I 
have only repartitioned 't' so far.

3) Anything else that you would recommend trying to improve the performance of 
my query?

Thank you!

Hanxi





== Parsed Logical Plan ==
'Project ['t.device_uid, 'g.grid_id]
+- 'Filter 'st_contains('ST_GeomFromWKT('g.geometry), 
'ST_GeomFromWKT('t.geometry))
   +- 'Join Inner
      :- 'SubqueryAlias t
      :  +- 'UnresolvedRelation [tutela1], [], false
      +- 'SubqueryAlias g
         +- 'UnresolvedRelation [grid], [], false

== Analyzed Logical Plan ==
device_uid: string, grid_id: bigint
Project [device_uid#14, grid_id#714L]
+- Filter  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
   +- Join Inner
      :- SubqueryAlias t
      :  +- SubqueryAlias tutela1
      :     +- View (`tutela1`, 
[device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,device_hsdpacapable#38,device_umtscapable#39,device_hspaevolvedcapable#40,device_ltecapable#41,device_lteadvancedcapable#42,device_ltecategorycapable#43L,device_voltecapable#44,device_wificapable#45,device_vowificapable#46,device_rcscapable#47,device_voiceovercellularcapable#48,device_2gcapable#49,device_3gcapable#50,device_4gcapable#51,device_5gcapable#52,device_gpscapable#53,device_screenwidth#54L,device_screenheight#55L,device_storage#56L,device_usedstorage#57L,device_freestorage#58L,device_memory#59L,device_usedmemory#60L,device_freememory#61L,device_cpu#62,device_batterylevel#63,device_batterystate#64,device_systemuptime#65L,device_speed#66,device_bearing#67,device_simslotcount#68L,device_esimcount#69L,device_simsize#70,device_simmcc#71,device_simmnc#72,device_simserviceprovider#73,device_simserviceproviderbrandname#74,device_simspecificserviceprovider#75,device_simcountry#76,device_simstate#77,device_simservicetype#78,device_simaccesspointname#79,device_sdkversion#80,device_sdkconfiguration#81,connection_start#82,connection_end#83,connection_length#84L,connection_timezone#85L,connection_type#86,connection_category#87,connection_technology#88,connection_generationcategory#89,connection_mcc#90,connection_mnc#91,connection_towermcc#92,connection_towermnc#93,connection_lac#94L,connection_cid#95L,connection_enodebid#96,connection_sectorid#97L,connection_towercelltype#98,connection_serviceprovider#99,connection_serviceproviderbrandname#100,connection_mobilechannel#101L,connection_mobilefrequency#102,connection_band#103,connection_bsic#104L,connection_pci#105L,connection_psc#106L,connection_cpid#107L,connection_bandwidth#108L,connection_bssid#109,connection_ssid#110,connection_wififrequency#111L,connection_wifichannel#112L,connection_wifichannelwidth#113,connection_wifimanufacturer#114,connection_internetasn#115,connection_internetserviceprovider#116,connection_internetserviceproviderorganization#117,location_latitude#118,location_longitude#119,location_altitude#120,location_horizontalaccuracy#121,location_verticalaccuracy#122,location_availability#123,location_horizontalaccuracyassessment#124,location_assessmentmodelversion#125,location_country#126,location_region#127,location_city#128,location_geohash6#129,location_geohash7#130,location_geohash8#131,qos_date#132,qos_localdate#133,qos_uploadthroughput#134,qos_uploadthroughputdnslookup#135L,qos_uploadthroughputtimetofirstaccess#136L,qos_uploadthroughputteststatus#137,qos_uploadthroughputtestserver#138,qos_uploadthroughputtestsize#139L,qos_downloadthroughput#140,qos_downloadthroughputdnslookup#141L,qos_downloadthroughputtimetofirstaccess#142L,qos_downloadthroughputtimetofirstbyte#143L,qos_downloadthroughputteststatus#144,qos_downloadthroughputtestserver#145,qos_downloadthroughputtestsize#146L,qos_latencymin#147,qos_latencyaverage#148,qos_jittermin#149,qos_jitteraverage#150,qos_packetlossdiscardpercentage#151,qos_packetlosslostpercentage#152,qos_serverresponsednslookup#153L,qos_serverresponseteststatus#154,qos_serverresponsetestserver#155,qos_serverresponsetestconfigpacketdelay#156L,qos_serverresponsetestconfigpacketsize#157L,qos_serverresponsetestconfigsentpackets#158L,qos_icmplatencymin#159,qos_icmplatencyaverage#160,qos_icmppacketlosslostpercentage#161,qos_icmpteststatus#162,qos_icmptraceroutehops#163,qos_icmptraceroutehopcount#164L,qos_icmptracerouteteststatus#165,qos_icmptestserver#166,qos_icmptestconfigpacketdelay#167L,qos_icmptestconfigpacketsize#168L,qos_icmptestconfigsentpackets#169L,qos_icmptestconfigttl#170L,qos_icmptestconfigbytessent#171L,qos_icmptestconfigarguments#172,qos_icmptraceroutetestconfigpacketdelay#173L,qos_icmptraceroutetestconfigmaxhopcount#174L,qos_icmptraceroutetestsentpackets#175L,qos_visiblecelltowers#176L,qos_cellbandwidths#177,qos_linkspeed#178L,qos_linkspeedreceived#179L,qos_linkdownstreambandwidth#180L,qos_linkupstreambandwidth#181L,qos_signalstrength#182L,qos_signallevel#183L,qos_asu#184L,qos_deltatransmittedbytes#185L,qos_deltareceivedbytes#186L,qos_deltatransceivedbytes#187L,qos_rsrp#188L,qos_rsrq#189L,qos_rssnr#190L,qos_cqi#191L,qos_ta#192L,qos_ecio#193L,qos_evdosnr#194L,qos_gsmbiterrorrate#195L,qos_ecno#196L,qos_newradiocsirsrp#197L,qos_newradiocsirsrq#198L,qos_newradiocsisinr#199L,qos_newradiossrsrp#200L,qos_newradiossrsrq#201L,qos_newradiosssinr#202L,qos_newradiostate#203,qos_newradioendcstate#204,qos_newradiofrequencyrange#205,qos_appstate#206,qos_voiceservicestate#207,qos_ltevoiceduplexmode#208,qos_vpnconnected#209,qos_userinteraction#210,qos_dozemode#211,qos_networkrestrictions#212,qos_callstate#213,qos_castate#214,qos_mobiledataroamingstate#215,qos_tetheringstate#216,qos_dnsaddresses#217,qos_airplanemode#218,qos_testtrigger#219L,meta_createddate#220,device_typeallocationcode#221,device_hardwareclassification#222,device_esimstate#223,connection_cellbands#224,connection_cellplmns#225,connection_equivalenthomeplmns#226,connection_meteredstate#227,connection_networkcapabilities#228,connection_cellulardataallowed#229,connection_cellulardatadisabledreason#230,connection_wifiprotocol#231,location_applocationaccuracy#232,location_continent#233,qos_icmptraceroutemaxhopcount#234L,qos_rscp#235L,qos_cqitableindex#236L,qos_csicqitableindex#237L,qos_csicqireport#238,qos_appstandbybucket#239,meta_functionsversion#240,qos_day#241,region#242,year#243,month#244,day#245,geometry#479])
      :        +- Repartition 50000, true
      :           +- Project [device_uid#14, device_manufacturer#15, 
device_manufacturerbrandname#16, device_model#17, device_modelbrandname#18, 
device_os#19, device_osbrandname#20, device_language#21, device_locale#22, 
device_useragent#23, device_isrooted#24, device_yearreleased#25L, 
device_primaryhardwaretype#26, device_chipsetmanufacturer#27, 
device_chipsetbrandname#28, device_chipsetmodel#29, device_cpubrandname#30, 
device_cpucores#31, device_cpumaxfrequency#32, device_gpuname#33, 
device_csdcapable#34, device_hscsdcapable#35, device_gprscapable#36, 
device_edgecapable#37, ... 209 more fields]
      :              +- Relation 
[device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,...
 208 more fields] parquet
      +- SubqueryAlias g
         +- SubqueryAlias grid
            +- View (`grid`, 
[grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731])
               +- Relation 
[grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]
 parquet

== Optimized Logical Plan ==
Project [device_uid#14, grid_id#714L]
+- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
   :- Repartition 50000, true
   :  +- Project [device_uid#14, pythonUDF0#752 AS geometry#479]
   :     +- BatchEvalPython [convert_wkt(location_longitude#119, 
location_latitude#118)#478], [pythonUDF0#752]
   :        +- Project [device_uid#14, location_latitude#118, 
location_longitude#119]
   :           +- Relation 
[device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,...
 208 more fields] parquet
   +- Project [grid_id#714L, geometry#725]
      +- Relation 
[grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]
 parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [device_uid#14, grid_id#714L]
   +- RangeJoin  **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT** 
 ,  **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**  , WITHIN
      :- Exchange RoundRobinPartitioning(50000), REPARTITION_BY_NUM, [id=#60]
      :  +- Project [device_uid#14, pythonUDF0#752 AS geometry#479]
      :     +- BatchEvalPython [convert_wkt(location_longitude#119, 
location_latitude#118)#478], [pythonUDF0#752]
      :        +- Project [device_uid#14, location_latitude#118, 
location_longitude#119]
      :           +- FileScan parquet 
[device_uid#14,location_latitude#118,location_longitude#119,region#242,year#243,month#244,day#245]
 Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[s3://000crcdatalake-hanxi/tutela_ca_all], PartitionFilters: [], 
PushedFilters: [], ReadSchema: 
struct<device_uid:string,location_latitude:double,location_longitude:double>
      +- Project [grid_id#714L, geometry#725]
         +- FileScan parquet 
[grid_id#714L,geometry#725,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]
 Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[s3://000crcdatalake/ised_grid], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<grid_id:bigint,geometry:string>

None
J'ai fini

Reply via email to