[ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
---------------------------------
    Description: 
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
        Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
        at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
        at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
        at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
        at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
        at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
        at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
        at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
        at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
        at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
        at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save

When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:
!https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
!https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!







  was:
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
        Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
        at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
        at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
        at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
        at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
        at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
        at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
        at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
        at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
        at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
        at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save

When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:
!https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
!!








> Application failed due to failed to get MapStatuses broadcast
> -------------------------------------------------------------
>
>                 Key: SPARK-30849
>                 URL: https://issues.apache.org/jira/browse/SPARK-30849
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: liupengcheng
>            Priority: Major
>         Attachments: image-2020-02-16-11-13-18-195.png, 
> image-2020-02-16-11-17-32-103.png
>
>
> Currently, we encountered a issue in Spark2.1. The exception is as follows:
> {noformat}
>       Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
> most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 
> 8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
>       at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>       at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>       at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>       at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>       at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>       at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
>       at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
>       at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
>       at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
>       at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> {noformat}
> I looked into the code and the logs, it seems that it's caused by the 
> mapStatuses broadcast id is sent to executor, but was invalidated immediately 
> by the driver before the real fetching of the broadcast.
> This can be described as follows:
> Let's say we have an rdd1,
> rdd2 = rdd1.repartition(100) // stage 0
> rdd3 = rdd2.map(xxx)  // stage 1
> rdd4 = rdd2.map(xxx)  // stage 2
> // and then do some join and output result
> rdd3.join(rdd4).save
> When FetchFailedException happened in stage 1, then stage 0 and stage 1 will 
> be resubmitted and re-executed, but stage 2 is still running, it's task will 
> fetch mapStatuses from driver, but the mapStatuses cache will be invalidated 
> when tasks of stage 0.1 completes and registerMapOutput.
> I checked the master branch, seems that we are fixed correctness issues on 
> `repartition`, but I think this issue may still exist? 
> Some ScreenShot:
> !https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
> !https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to