[ 
https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960953#comment-15960953
 ] 

Quentin Auge edited comment on SPARK-20227 at 4/7/17 3:24 PM:
--------------------------------------------------------------

Well, you were right to ask. After further investigation, it seems the job does 
not hang forever, but takes much longer to finish when n is large enough.

I made a mistake when I mentioned the job finishes in a sensible amount of time 
for n = 20. Really, it starts to hang badly from n = 12 (so 11 successive 
aggregations).

See the following time:
!https://image.ibb.co/fxRL85/figure_1.png!
n on x axis, time for the job to complete in seconds on y axis.
The job takes as much as 1 hour and 47 minutes to complete for n = 14.

The significant amount of time the job takes from n = 12 is spent after this 
message:
{code}
17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because 
it has been idle for 60 seconds (new desired total will be 0)
17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling 
executor 1.
17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0)
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 
from BlockManagerMaster.
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager 
BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None)
17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in 
removeExecutor
17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on 
ip-172-30-0-149.ec2.internal killed by driver.
17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been 
removed (new total is 0)
{code}

Right after that, the job starts again and completes.


was (Author: quentin):
Well, you were right to ask. After further investigation, it seems the job does 
not hang forever, but takes much longer to finish when n is large enough.

I made a mistake when I mentioned the job finished in a sensible amount of time 
for n = 20. Really, it starts to hang badly from n = 12 (so 11 successive 
aggregations).

See the following time:
!https://image.ibb.co/fxRL85/figure_1.png!
n on x axis, time for the job to complete in seconds on y axis.
The job takes as much as 1 hour and 47 minutes to complete for n = 14.

The significant amount of time the job takes from n = 12 is spent after this 
message:
{code}
17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because 
it has been idle for 60 seconds (new desired total will be 0)
17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling 
executor 1.
17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0)
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 
from BlockManagerMaster.
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager 
BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None)
17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in 
removeExecutor
17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on 
ip-172-30-0-149.ec2.internal killed by driver.
17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been 
removed (new total is 0)
{code}

Right after that, the job starts again and completes.

> Job hangs when joining a lot of aggregated columns
> --------------------------------------------------
>
>                 Key: SPARK-20227
>                 URL: https://issues.apache.org/jira/browse/SPARK-20227
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: AWS emr-5.4.0, master: m4.xlarge, core: 4 m4.xlarge
>            Reporter: Quentin Auge
>
> I'm trying to replace a lot of different columns in a dataframe with 
> aggregates of themselves, and then join the resulting dataframe.
> {code}
> # Create a dataframe with 1 row and 50 columns
> n = 50
> df = sc.parallelize([Row(*range(n))]).toDF()
> cols = df.columns
> # Replace each column values with aggregated values
> window = Window.partitionBy(cols[0])
> for col in cols[1:]:
>     df = df.withColumn(col, sum(col).over(window))
> # Join
> other_df = sc.parallelize([Row(0)]).toDF()
> result = other_df.join(df, on = cols[0])
> result.show()
> {code}
> Spark hangs forever when executing the last line. The strange thing is, it 
> depends on the number of columns. Spark does not hang for n = 5, 10, or 20 
> columns. For n = 50 and beyond, it does.
> {code}
> 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because 
> it has been idle for 60 seconds (new desired total will be 0)
> 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling 
> executor 1.
> 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0)
> 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 
> 1 from BlockManagerMaster.
> 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager 
> BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None)
> 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in 
> removeExecutor
> 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on 
> ip-172-30-0-149.ec2.internal killed by driver.
> 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has 
> been removed (new total is 0)
> {code}
> All executors are inactive and thus killed after 60 seconds, the master 
> spends some CPU on a process that hangs indefinitely, and the workers are 
> idle.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to