Re: help understanding physical plan

2019-08-16 Thread Marcelo Valle
Thanks Tianlang. I saw the DAG on YARN, but what really solved my problem
is adding intermediate steps and evaluating them eagerly to find out where
the bottleneck was.
My process now runs in 6 min. :D

Thanks for the help.

[]s

On Thu, 15 Aug 2019 at 07:25, Tianlang 
wrote:

> Hi,
>
> Maybe you can look at the spark ui. The physical plan has no time
> consuming information.
> 在 2019/8/13 下午10:45, Marcelo Valle 写道:
>
> Hi,
>
> I have a job running on AWS EMR. It's basically a join between 2 tables
> (parquet files on s3), one somehow large (around 50 gb) and other small
> (less than 1gb).
> The small table is the result of other operations, but it was a dataframe
> with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
> dataframe finishes quickly.
> When I run my "LEFT_ANTI" join, I get the execution plan down bellow.
> While most of my jobs on larges amount of data take max 1 h on this
> cluster, this one takes almost 1 day to complete.
>
> What could I be doing wrong? I am trying to analyze the plan, but I can't
> find anything that justify the slowness. It has 2 shuffles followed by a
> zip, but other jobs have similar things and they are not that slow.
>
> Could anyone point me to possible actions I could take to investigate this?
>
> Thanks,
> Marcelo.
>
> == Physical Plan ==
> *(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
> USAGE_AGGREGATED_METADATA_HASH#1513]
> +- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
> [coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
> ((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
> && (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
>:- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
>: +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493,
> ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
> WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
> RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
> USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
>:   +- *(2) Project [ID#328 AS
> USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
> ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
> RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
> uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
> RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
> CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
> artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
> USAGE_AGGREGATED_METADATA_HASH#1513]
>:  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
> coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
> coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
> BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
> (track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
> artist_name_1#1422)) && (work_writer_names#293 <=>
> work_writer_names_1#1423))
>: :- BroadcastExchange
> HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
> coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
> coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
>: :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
> iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
> artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
> work_writer_names_1#1423]
>: : +- *(1) Filter isnotnull(ID#328)
>: :+- InMemoryTableScan [ID#328,
> artist_name#292, isrc#289, iswc#290, track_name#291,
> work_writer_names#293], [isnotnull(ID#328)]
>: :  +- InMemoryRelation [ID#328, isrc#289,
> iswc#290, track_name#291, artist_name#292, work_writer_names#293],
> StorageLevel(disk, memory, 1 replicas)
>: :+- *(2) Project [ID#328,
> isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
>: :   +- *(2) BroadcastHashJoin
> [coalesce(ISRC#329, ), coalesce(ISWC#330, ), coalesce(RECORDING_TITLE#331,
> ), coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(substring(artist_name#292, 0, 1000), ),
> coalesce(work_writer_names#293, )], RightOuter, BuildLeft, (isrc#289
> <=> ISRC#329) && 

Re: help understanding physical plan

2019-08-15 Thread Tianlang

Hi,

Maybe you can look at the spark ui. The physical plan has no time 
consuming information.


在 2019/8/13 下午10:45, Marcelo Valle 写道:

Hi,

I have a job running on AWS EMR. It's basically a join between 2 
tables (parquet files on s3), one somehow large (around 50 gb) and 
other small (less than 1gb).
The small table is the result of other operations, but it was a 
dataframe with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the 
count on this dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow. 
While most of my jobs on larges amount of data take max 1 h on this 
cluster, this one takes almost 1 day to complete.


What could I be doing wrong? I am trying to analyze the plan, but I 
can't find anything that justify the slowness. It has 2 shuffles 
followed by a zip, but other jobs have similar things and they are not 
that slow.


Could anyone point me to possible actions I could take to investigate 
this?


Thanks,
Marcelo.

== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493, 
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702, 
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, 
), coalesce(SENDER_RECORDING_IDENTIFIER#1499, )], 
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ), 
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight, 
((USAGE_AGGREGATED_METADATA_ID#356 <=> 
USAGE_AGGREGATED_METADATA_ID#1493) && (SENDER_RECORDING_IDENTIFIER#357 
<=> SENDER_RECORDING_IDENTIFIER#1499))
   :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493, 
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
   :     +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, 
ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, 
RECORDING_DISPLAY_ARTIST#1497, WORK_WRITERS#1498, 
SENDER_RECORDING_IDENTIFIER#1499, RECORDING_VERSION_TITLE#1500, 
WORK_TITLE#1501, CONTENT_TYPE#1502, 
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 
replicas)
   :           +- *(2) Project [ID#328 AS 
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS 
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS 
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS 
WORK_WRITERS#1498, uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null 
AS RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS 
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null, 
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS 
USAGE_AGGREGATED_METADATA_HASH#1513]
   :              +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ), 
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ), 
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )], 
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, 
), coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], 
Inner, BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> 
iswc_1#1420)) && (track_name#291 <=> track_name_1#1421)) && 
(artist_name#292 <=> artist_name_1#1422)) && (work_writer_names#293 
<=> work_writer_names_1#1423))
   :                 :- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
coalesce(input[2, string, true], ), coalesce(input[3, string, true], 
), coalesce(input[4, string, true], ), coalesce(input[5, string, 
true], )))
   :                 :  +- *(1) Project [ID#328, isrc#289 AS 
isrc_1#1419, iswc#290 AS iswc_1#1420, track_name#291 AS 
track_name_1#1421, artist_name#292 AS artist_name_1#1422, 
work_writer_names#293 AS work_writer_names_1#1423]

   :                 :     +- *(1) Filter isnotnull(ID#328)
   :                 :        +- InMemoryTableScan [ID#328, 
artist_name#292, isrc#289, iswc#290, track_name#291, 
work_writer_names#293], [isnotnull(ID#328)]
   :                 :              +- InMemoryRelation [ID#328, 
isrc#289, iswc#290, track_name#291, artist_name#292, 
work_writer_names#293], StorageLevel(disk, memory, 1 replicas)
   :                 :                    +- *(2) Project [ID#328, 
isrc#289, iswc#290, track_name#291, artist_name#292, 
work_writer_names#293]
   :                 :                       +- *(2) BroadcastHashJoin 
[coalesce(ISRC#329, ), coalesce(ISWC#330, ), 
coalesce(RECORDING_TITLE#331, ), 
coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, 
)], [coalesce(isrc#289, ), coalesce(iswc#290, ), 
coalesce(track_name#291, ), coalesce(substring(artist_name#292, 0, 
1000), ), coalesce(work_writer_names#293, )], RightOuter, BuildLeft, 
(isrc#289 <=> ISRC#329) && (iswc#290 <=> ISWC#330)) && 
(track_name#291 <=> RECORDING_TITLE#331)) && 
(substring(artist_name#292, 0, 1000) <=> 
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=> 
WORK_WRITERS#333))
   :                 :                          :- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
coalesce(input[2, string, true], ), 

help understanding physical plan

2019-08-13 Thread Marcelo Valle
Hi,

I have a job running on AWS EMR. It's basically a join between 2 tables
(parquet files on s3), one somehow large (around 50 gb) and other small
(less than 1gb).
The small table is the result of other operations, but it was a dataframe
with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow. While
most of my jobs on larges amount of data take max 1 h on this cluster, this
one takes almost 1 day to complete.

What could I be doing wrong? I am trying to analyze the plan, but I can't
find anything that justify the slowness. It has 2 shuffles followed by a
zip, but other jobs have similar things and they are not that slow.

Could anyone point me to possible actions I could take to investigate this?

Thanks,
Marcelo.

== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
&& (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
   :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
   : +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, ISRC#1494,
ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
   :   +- *(2) Project [ID#328 AS
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
USAGE_AGGREGATED_METADATA_HASH#1513]
   :  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
(track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
artist_name_1#1422)) && (work_writer_names#293 <=>
work_writer_names_1#1423))
   : :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
   : :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
work_writer_names_1#1423]
   : : +- *(1) Filter isnotnull(ID#328)
   : :+- InMemoryTableScan [ID#328,
artist_name#292, isrc#289, iswc#290, track_name#291,
work_writer_names#293], [isnotnull(ID#328)]
   : :  +- InMemoryRelation [ID#328, isrc#289,
iswc#290, track_name#291, artist_name#292, work_writer_names#293],
StorageLevel(disk, memory, 1 replicas)
   : :+- *(2) Project [ID#328,
isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
   : :   +- *(2) BroadcastHashJoin
[coalesce(ISRC#329, ), coalesce(ISWC#330, ), coalesce(RECORDING_TITLE#331,
), coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
coalesce(substring(artist_name#292, 0, 1000), ),
coalesce(work_writer_names#293, )], RightOuter, BuildLeft, (isrc#289
<=> ISRC#329) && (iswc#290 <=> ISWC#330)) && (track_name#291 <=>
RECORDING_TITLE#331)) && (substring(artist_name#292, 0, 1000) <=>
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=>
WORK_WRITERS#333))
   : :  :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
   : :  :  +- *(1) Project [ID#328,
ISRC#329, ISWC#330,