Hello Sedona community,
I was benchmarking the following geospatial range join query and similar using
Sedona, based on serverless spark clusters provisioned via Amazon Glue
Notebooks. I have been using Sedona version v1.5.0. Pyspark dataframes 't' and
'g' are both loaded from parquet files hosted on Amazon S3, and are partitioned
in nested folder structures, based on region, year, month and date.
Specifically, 'g' is a polygon layer with millions of rows representing
polygons across Canada, and 't' is a point layer that has about 200 columns,
and up to tens of billions of rows depending on the region/year/month/date
chosen, representing points across Canada and the U.S.
result = sedona.sql(
"""
select t.device_uid, g.grid_id
from t, g
where st_contains(ST_GeomFromWKT(g.geometry),
ST_GeomFromWKT(t.geometry))
"""
)
I was able to increase spark cluster size in AWS Glue to handle larger volumes
of the 't' dataset. For instance, as a stress test, I was able to complete a
query with 't' amounting to 14 billion rows in Canada in a bit over 3 hours,
with a spark cluster of 2TB memory. I am in general impressed with the
performance, and my questions below are regarding potential venues to improve
the performance a bit further if possible, and also help me understand deeper
about Sedona performance tuning. I am attaching the result of running
'explain' on my query. Any comments and suggestions are greatly appreciated!
And I apologize for the long message.
1) Geoparquet-level predicate pushdown
Based on https://sedona.apache.org/1.5.0/api/sql/Optimizer/, my understanding
is that geoparquet level predicate pushdown happens automatically. I have
experimented with roughly equal volumes of 't' in Canada and U.S. respectively.
When 't' is in the order of hundreds of millions of objects in the U.S., the
above query would complete in 1/6 of the time taken by same volume of 't' data
in Canada, with an identical size and configuration of spark cluster. That
seems to confirm geoparquet-level predicate pushdown is at play and causes
'polygons in Canada containing points in U.S.' to return right away.
When I increased the volume of 't' data 10x to the order of billions of point
objects, the U.S. case is still faster than the Canada case, but only
marginally, based on a spark cluster 8x larger in both cases. So for the
seemingly diminishing benefit of geoparquet-level predicate pushdown, I wonder
if it is hitting any limitation of Sedona's predicate pushdown mechanism, or it
is potentially due to limitations associated with the way spark clusters are
being scaled up in AWS Glue.
In general, how can one confirm based on spark logs etc. that geoparquet-level
predicate pushdown is active? Can it be turned on and off? And if so can one
see statistics on volumes of data and files scanned?
As a related note and question, in
https://sedona.apache.org/1.5.0/api/sql/Optimizer/ under "Push spatial
predicates to GeoParquet", it states that "To maximize the performance of
Sedona GeoParquet filter pushdown, we suggest that you sort the data by their
geohash values and then save as a GeoParquet file". In my case, the large
points layer 't' does have a geohash6 column, and I took the following steps to
see if that would help
a) select * from 't' order by t.geohash6
b) save the result to S3 as geoparquet files, following the same partitioning
scheme
c) run 'g' contains 't' query against the newly ordered 't' dataset with a
spark cluster of the same size and configuration
Based on my results, that does help the performance a little bit, but it is
very marginal to the point it is negligible. I am a little confused as to
whether geoparquet-level predicate pushdown is active or not.
2) The use of pyspark dataframe function repartition(n, column)
I have experimented with the repartition function against the dataset 't',
i.e., the layer consists of a large number of points. From my experience,
increasing the count n will help performance up until a certain point, beyond
which having more partitions could actually lead to slower queries, and may
also cause the spark cluster to fail with out of memory error.
If I understand correctly, finding the right partition count is a matter of
experimentation. Is there something in the spark logs that could be an
indication that the number of partitions is too low or too high?
And a second sub-question regarding repartition is that, should one repartition
all dataframes involved in a query to improve performance, or just the biggest
dataframe? In my case, since the point layer 't' is much greater than the
polygon layer 'g' in terms of both object count and data volume in GBs on S3, I
have only repartitioned 't' so far.
3) Anything else that you would recommend trying to improve the performance of
my query?
Thank you!
Hanxi
== Parsed Logical Plan ==
'Project ['t.device_uid, 'g.grid_id]
+- 'Filter 'st_contains('ST_GeomFromWKT('g.geometry),
'ST_GeomFromWKT('t.geometry))
+- 'Join Inner
:- 'SubqueryAlias t
: +- 'UnresolvedRelation [tutela1], [], false
+- 'SubqueryAlias g
+- 'UnresolvedRelation [grid], [], false
== Analyzed Logical Plan ==
device_uid: string, grid_id: bigint
Project [device_uid#14, grid_id#714L]
+- Filter **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
+- Join Inner
:- SubqueryAlias t
: +- SubqueryAlias tutela1
: +- View (`tutela1`,
[device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,device_hsdpacapable#38,device_umtscapable#39,device_hspaevolvedcapable#40,device_ltecapable#41,device_lteadvancedcapable#42,device_ltecategorycapable#43L,device_voltecapable#44,device_wificapable#45,device_vowificapable#46,device_rcscapable#47,device_voiceovercellularcapable#48,device_2gcapable#49,device_3gcapable#50,device_4gcapable#51,device_5gcapable#52,device_gpscapable#53,device_screenwidth#54L,device_screenheight#55L,device_storage#56L,device_usedstorage#57L,device_freestorage#58L,device_memory#59L,device_usedmemory#60L,device_freememory#61L,device_cpu#62,device_batterylevel#63,device_batterystate#64,device_systemuptime#65L,device_speed#66,device_bearing#67,device_simslotcount#68L,device_esimcount#69L,device_simsize#70,device_simmcc#71,device_simmnc#72,device_simserviceprovider#73,device_simserviceproviderbrandname#74,device_simspecificserviceprovider#75,device_simcountry#76,device_simstate#77,device_simservicetype#78,device_simaccesspointname#79,device_sdkversion#80,device_sdkconfiguration#81,connection_start#82,connection_end#83,connection_length#84L,connection_timezone#85L,connection_type#86,connection_category#87,connection_technology#88,connection_generationcategory#89,connection_mcc#90,connection_mnc#91,connection_towermcc#92,connection_towermnc#93,connection_lac#94L,connection_cid#95L,connection_enodebid#96,connection_sectorid#97L,connection_towercelltype#98,connection_serviceprovider#99,connection_serviceproviderbrandname#100,connection_mobilechannel#101L,connection_mobilefrequency#102,connection_band#103,connection_bsic#104L,connection_pci#105L,connection_psc#106L,connection_cpid#107L,connection_bandwidth#108L,connection_bssid#109,connection_ssid#110,connection_wififrequency#111L,connection_wifichannel#112L,connection_wifichannelwidth#113,connection_wifimanufacturer#114,connection_internetasn#115,connection_internetserviceprovider#116,connection_internetserviceproviderorganization#117,location_latitude#118,location_longitude#119,location_altitude#120,location_horizontalaccuracy#121,location_verticalaccuracy#122,location_availability#123,location_horizontalaccuracyassessment#124,location_assessmentmodelversion#125,location_country#126,location_region#127,location_city#128,location_geohash6#129,location_geohash7#130,location_geohash8#131,qos_date#132,qos_localdate#133,qos_uploadthroughput#134,qos_uploadthroughputdnslookup#135L,qos_uploadthroughputtimetofirstaccess#136L,qos_uploadthroughputteststatus#137,qos_uploadthroughputtestserver#138,qos_uploadthroughputtestsize#139L,qos_downloadthroughput#140,qos_downloadthroughputdnslookup#141L,qos_downloadthroughputtimetofirstaccess#142L,qos_downloadthroughputtimetofirstbyte#143L,qos_downloadthroughputteststatus#144,qos_downloadthroughputtestserver#145,qos_downloadthroughputtestsize#146L,qos_latencymin#147,qos_latencyaverage#148,qos_jittermin#149,qos_jitteraverage#150,qos_packetlossdiscardpercentage#151,qos_packetlosslostpercentage#152,qos_serverresponsednslookup#153L,qos_serverresponseteststatus#154,qos_serverresponsetestserver#155,qos_serverresponsetestconfigpacketdelay#156L,qos_serverresponsetestconfigpacketsize#157L,qos_serverresponsetestconfigsentpackets#158L,qos_icmplatencymin#159,qos_icmplatencyaverage#160,qos_icmppacketlosslostpercentage#161,qos_icmpteststatus#162,qos_icmptraceroutehops#163,qos_icmptraceroutehopcount#164L,qos_icmptracerouteteststatus#165,qos_icmptestserver#166,qos_icmptestconfigpacketdelay#167L,qos_icmptestconfigpacketsize#168L,qos_icmptestconfigsentpackets#169L,qos_icmptestconfigttl#170L,qos_icmptestconfigbytessent#171L,qos_icmptestconfigarguments#172,qos_icmptraceroutetestconfigpacketdelay#173L,qos_icmptraceroutetestconfigmaxhopcount#174L,qos_icmptraceroutetestsentpackets#175L,qos_visiblecelltowers#176L,qos_cellbandwidths#177,qos_linkspeed#178L,qos_linkspeedreceived#179L,qos_linkdownstreambandwidth#180L,qos_linkupstreambandwidth#181L,qos_signalstrength#182L,qos_signallevel#183L,qos_asu#184L,qos_deltatransmittedbytes#185L,qos_deltareceivedbytes#186L,qos_deltatransceivedbytes#187L,qos_rsrp#188L,qos_rsrq#189L,qos_rssnr#190L,qos_cqi#191L,qos_ta#192L,qos_ecio#193L,qos_evdosnr#194L,qos_gsmbiterrorrate#195L,qos_ecno#196L,qos_newradiocsirsrp#197L,qos_newradiocsirsrq#198L,qos_newradiocsisinr#199L,qos_newradiossrsrp#200L,qos_newradiossrsrq#201L,qos_newradiosssinr#202L,qos_newradiostate#203,qos_newradioendcstate#204,qos_newradiofrequencyrange#205,qos_appstate#206,qos_voiceservicestate#207,qos_ltevoiceduplexmode#208,qos_vpnconnected#209,qos_userinteraction#210,qos_dozemode#211,qos_networkrestrictions#212,qos_callstate#213,qos_castate#214,qos_mobiledataroamingstate#215,qos_tetheringstate#216,qos_dnsaddresses#217,qos_airplanemode#218,qos_testtrigger#219L,meta_createddate#220,device_typeallocationcode#221,device_hardwareclassification#222,device_esimstate#223,connection_cellbands#224,connection_cellplmns#225,connection_equivalenthomeplmns#226,connection_meteredstate#227,connection_networkcapabilities#228,connection_cellulardataallowed#229,connection_cellulardatadisabledreason#230,connection_wifiprotocol#231,location_applocationaccuracy#232,location_continent#233,qos_icmptraceroutemaxhopcount#234L,qos_rscp#235L,qos_cqitableindex#236L,qos_csicqitableindex#237L,qos_csicqireport#238,qos_appstandbybucket#239,meta_functionsversion#240,qos_day#241,region#242,year#243,month#244,day#245,geometry#479])
: +- Repartition 50000, true
: +- Project [device_uid#14, device_manufacturer#15,
device_manufacturerbrandname#16, device_model#17, device_modelbrandname#18,
device_os#19, device_osbrandname#20, device_language#21, device_locale#22,
device_useragent#23, device_isrooted#24, device_yearreleased#25L,
device_primaryhardwaretype#26, device_chipsetmanufacturer#27,
device_chipsetbrandname#28, device_chipsetmodel#29, device_cpubrandname#30,
device_cpucores#31, device_cpumaxfrequency#32, device_gpuname#33,
device_csdcapable#34, device_hscsdcapable#35, device_gprscapable#36,
device_edgecapable#37, ... 209 more fields]
: +- Relation
[device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,...
208 more fields] parquet
+- SubqueryAlias g
+- SubqueryAlias grid
+- View (`grid`,
[grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731])
+- Relation
[grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]
parquet
== Optimized Logical Plan ==
Project [device_uid#14, grid_id#714L]
+- Join Inner, **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Repartition 50000, true
: +- Project [device_uid#14, pythonUDF0#752 AS geometry#479]
: +- BatchEvalPython [convert_wkt(location_longitude#119,
location_latitude#118)#478], [pythonUDF0#752]
: +- Project [device_uid#14, location_latitude#118,
location_longitude#119]
: +- Relation
[device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,...
208 more fields] parquet
+- Project [grid_id#714L, geometry#725]
+- Relation
[grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]
parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [device_uid#14, grid_id#714L]
+- RangeJoin **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
, **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT** , WITHIN
:- Exchange RoundRobinPartitioning(50000), REPARTITION_BY_NUM, [id=#60]
: +- Project [device_uid#14, pythonUDF0#752 AS geometry#479]
: +- BatchEvalPython [convert_wkt(location_longitude#119,
location_latitude#118)#478], [pythonUDF0#752]
: +- Project [device_uid#14, location_latitude#118,
location_longitude#119]
: +- FileScan parquet
[device_uid#14,location_latitude#118,location_longitude#119,region#242,year#243,month#244,day#245]
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[s3://000crcdatalake-hanxi/tutela_ca_all], PartitionFilters: [],
PushedFilters: [], ReadSchema:
struct<device_uid:string,location_latitude:double,location_longitude:double>
+- Project [grid_id#714L, geometry#725]
+- FileScan parquet
[grid_id#714L,geometry#725,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[s3://000crcdatalake/ised_grid], PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<grid_id:bigint,geometry:string>
None
J'ai fini