Hi,
Maybe you can look at the spark ui. The physical plan has no time
consuming information.
在 2019/8/13 下午10:45, Marcelo Valle 写道:
Hi,
I have a job running on AWS EMR. It's basically a join between 2
tables (parquet files on s3), one somehow large (around 50 gb) and
other small (less than 1gb).
The small table is the result of other operations, but it was a
dataframe with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the
count on this dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow.
While most of my jobs on larges amount of data take max 1 h on this
cluster, this one takes almost 1 day to complete.
What could I be doing wrong? I am trying to analyze the plan, but I
can't find anything that justify the slowness. It has 2 shuffles
followed by a zip, but other jobs have similar things and they are not
that slow.
Could anyone point me to possible actions I could take to investigate
this?
Thanks,
Marcelo.
== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493,
), coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
((USAGE_AGGREGATED_METADATA_ID#356 <=>
USAGE_AGGREGATED_METADATA_ID#1493) && (SENDER_RECORDING_IDENTIFIER#357
<=> SENDER_RECORDING_IDENTIFIER#1499))
:- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
: +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493,
ISRC#1494, ISWC#1495, RECORDING_TITLE#1496,
RECORDING_DISPLAY_ARTIST#1497, WORK_WRITERS#1498,
SENDER_RECORDING_IDENTIFIER#1499, RECORDING_VERSION_TITLE#1500,
WORK_TITLE#1501, CONTENT_TYPE#1502,
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1
replicas)
: +- *(2) Project [ID#328 AS
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS
WORK_WRITERS#1498, uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null
AS RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
USAGE_AGGREGATED_METADATA_HASH#1513]
: +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291,
), coalesce(artist_name#292, ), coalesce(work_writer_names#293, )],
Inner, BuildLeft, (((((isrc#289 <=> isrc_1#1419) && (iswc#290 <=>
iswc_1#1420)) && (track_name#291 <=> track_name_1#1421)) &&
(artist_name#292 <=> artist_name_1#1422)) && (work_writer_names#293
<=> work_writer_names_1#1423))
: :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true],
), coalesce(input[4, string, true], ), coalesce(input[5, string,
true], )))
: : +- *(1) Project [ID#328, isrc#289 AS
isrc_1#1419, iswc#290 AS iswc_1#1420, track_name#291 AS
track_name_1#1421, artist_name#292 AS artist_name_1#1422,
work_writer_names#293 AS work_writer_names_1#1423]
: : +- *(1) Filter isnotnull(ID#328)
: : +- InMemoryTableScan [ID#328,
artist_name#292, isrc#289, iswc#290, track_name#291,
work_writer_names#293], [isnotnull(ID#328)]
: : +- InMemoryRelation [ID#328,
isrc#289, iswc#290, track_name#291, artist_name#292,
work_writer_names#293], StorageLevel(disk, memory, 1 replicas)
: : +- *(2) Project [ID#328,
isrc#289, iswc#290, track_name#291, artist_name#292,
work_writer_names#293]
: : +- *(2) BroadcastHashJoin
[coalesce(ISRC#329, ), coalesce(ISWC#330, ),
coalesce(RECORDING_TITLE#331, ),
coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333,
)], [coalesce(isrc#289, ), coalesce(iswc#290, ),
coalesce(track_name#291, ), coalesce(substring(artist_name#292, 0,
1000), ), coalesce(work_writer_names#293, )], RightOuter, BuildLeft,
(((((isrc#289 <=> ISRC#329) && (iswc#290 <=> ISWC#330)) &&
(track_name#291 <=> RECORDING_TITLE#331)) &&
(substring(artist_name#292, 0, 1000) <=>
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=>
WORK_WRITERS#333))
: : :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true],
), coalesce(input[4, string, true], ), coalesce(input[5, string,
true], )))
: : : +- *(1) Project
[ID#328, ISRC#329, ISWC#330, RECORDING_TITLE#331,
RECORDING_DISPLAY_ARTIST#332, WORK_WRITERS#333]
: : : +- *(1) Filter
((isnull(WORK_TITLE#334) && isnull(RECORDING_VERSION_TITLE#335)) &&
(CONTENT_TYPE#336 <=> SOUND))
: : : +- *(1)
FileScan parquet
[ID#328,ISRC#329,ISWC#330,RECORDING_TITLE#331,RECORDING_DISPLAY_ARTIST#332,WORK_WRITERS#333,WORK_TITLE#334,RECORDING_VERSION_TITLE#335,CONTENT_TYPE#336]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,
PartitionFilters: [], PushedFilters: [IsNull(WORK_TITLE),
IsNull(RECORDING_VERSION_TITLE), EqualNullSafe(CONTENT_TYPE,SOUND)],
ReadSchema:
struct<ID:string,ISRC:string,ISWC:string,RECORDING_TITLE:string,RECORDING_DISPLAY_ARTIST:string,W...
: : +- *(2) FileScan
parquet
[isrc#289,iswc#290,track_name#291,artist_name#292,work_writer_names#293]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<isrc:string,iswc:string,track_name:string,artist_name:string,work_writer_names:string>
: +- *(2) FileScan parquet
[uri#286,isrc#289,iswc#290,track_name#291,artist_name#292,work_writer_names#293]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<uri:string,isrc:string,iswc:string,track_name:string,artist_name:string,work_writer_names:...
+- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ),
coalesce(input[1, string, true], )))
+- *(1) FileScan parquet
[USAGE_AGGREGATED_METADATA_ID#356,SENDER_RECORDING_IDENTIFIER#357]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<USAGE_AGGREGATED_METADATA_ID:string,SENDER_RECORDING_IDENTIFIER:string>
This email is confidential [and may be protected by legal privilege].
If you are not the intended recipient, please do not copy or disclose
its content but contact the sender immediately upon receipt.
KTech Services Ltd is registered in England as company number 10704940.
Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
United Kingdom
--
TianlangStudio <https://edu.51cto.com/sd/aca72>
Some of the biggest lies: I will start tomorrow/Others are better than
me/I am not good enough/I don't have time/This is the way I am
<https://edu.51cto.com/sd/aca72>
<https://github.com/TianLangStudio>