[jira] [Updated] (SPARK-22823) Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece

2018-04-20 Thread Dmitrii Bundin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitrii Bundin updated SPARK-22823:
---
Affects Version/s: 2.3.0

> Race Condition when reading Broadcast shuffle input. Failed to get broadcast 
> piece
> --
>
> Key: SPARK-22823
> URL: https://issues.apache.org/jira/browse/SPARK-22823
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.1, 2.2.1, 2.3.0
>Reporter: Dmitrii Bundin
>Priority: Major
>
> It seems we have a race condition when trying to read shuffle input which is 
> a broadcast, not direct. To read broadcast MapStatuses at 
> {code:java}
> org.apache.spark.shuffle.BlockStoreShuffleReader::read()
> {code}
> we submit a message of the type GetMapOutputStatuses(shuffleId) to be 
> executed in MapOutputTrackerMaster's pool which in turn ends up in creating a 
> new broadcast at
> {code:java}
> org.apache.spark.MapOutputTracker::serializeMapStatuses
> {code}
> if the received statuses bytes more than minBroadcastSize.
> So registering the newly created broadcast in the driver's 
> BlockManagerMasterEndpoint may appear later than some executor asks for the 
> broadcast piece location from the driver.
> In out project we get the following exception on the regular basis:
> {code:java}
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
> at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
> at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
> at 
> org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
> at 
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)
> {code}
> This exception is appeared when we try to read a broadcast piece. To do this 
> we need to fetch the broadcast piece location from the driver 
> {code:java}
> org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
> {code}
> . The driver responses with empty list of l

[jira] [Updated] (SPARK-22823) Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece

2017-12-18 Thread Dmitrii Bundin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitrii Bundin updated SPARK-22823:
---
Description: 
It seems we have a race condition when trying to read shuffle input which is a 
broadcast, not direct. To read broadcast MapStatuses at 
{code:java}
org.apache.spark.shuffle.BlockStoreShuffleReader::read()
{code}

we submit a message of the type GetMapOutputStatuses(shuffleId) to be executed 
in MapOutputTrackerMaster's pool which in turn ends up in creating a new 
broadcast at

{code:java}
org.apache.spark.MapOutputTracker::serializeMapStatuses
{code}

if the received statuses bytes more than minBroadcastSize.

So registering the newly created broadcast in the driver's 
BlockManagerMasterEndpoint may appear later than some executor asks for the 
broadcast piece location from the driver.

In out project we get the following exception on the regular basis:

{code:java}
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_176_piece0 of broadcast_176
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
at 
org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
at 
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_176_piece0 
of broadcast_176
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)

{code}

This exception is appeared when we try to read a broadcast piece. To do this we 
need to fetch the broadcast piece location from the driver 
{code:java}
org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
{code}
. The driver responses with empty list of locations and the fecthing of 
broadcast piece failed with the exception listed above. 

  was:
It seems we have a race condition when trying to read shuffle input which is a 
broadcast, not direct. To read broadcast MapStatuses at 
{code:java}
org.apache.spark.shuffle.BlockStoreShuffleReader::read()
{code}

we submit a message of the type GetMapOutputStatuses(shuffleId) to be executed 
in MapOutputTrackerMaster's pool which in turn ends up in creating a new 
broadcast in

{code:java}
org.apache.spark.MapOutputTracker::serializeMapStatuses
{code}

if the received statuses bytes more than minBroadcastSize.

So registering the newly created broadcast in 

[jira] [Updated] (SPARK-22823) Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece

2017-12-18 Thread Dmitrii Bundin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitrii Bundin updated SPARK-22823:
---
Priority: Major  (was: Minor)

> Race Condition when reading Broadcast shuffle input. Failed to get broadcast 
> piece
> --
>
> Key: SPARK-22823
> URL: https://issues.apache.org/jira/browse/SPARK-22823
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.1, 2.2.1
>Reporter: Dmitrii Bundin
>
> It seems we have a race condition when trying to read shuffle input which is 
> a broadcast, not direct. To read broadcast MapStatuses at 
> {code:java}
> org.apache.spark.shuffle.BlockStoreShuffleReader::read()
> {code}
> we submit a message of the type GetMapOutputStatuses(shuffleId) to be 
> executed in MapOutputTrackerMaster's pool which in turn ends up in creating a 
> new broadcast in
> {code:java}
> org.apache.spark.MapOutputTracker::serializeMapStatuses
> {code}
> if the received statuses bytes more than minBroadcastSize.
> So registering the newly created broadcast in the driver's 
> BlockManagerMasterEndpoint may appear later than some executor asks for the 
> broadcast piece location from the driver.
> In out project we get the following exception on the regular basis:
> {code:java}
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
> at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
> at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
> at 
> org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
> at 
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)
> {code}
> This exception is appeared when we try to read a broadcast piece. To do this 
> we need to fetch the broadcast piece location from the driver 
> {code:java}
> org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
> {code}
> . The driver responses with empty list of locations and the fecthing of 
>

[jira] [Updated] (SPARK-22823) Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece

2017-12-18 Thread Dmitrii Bundin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitrii Bundin updated SPARK-22823:
---
Summary: Race Condition when reading Broadcast shuffle input. Failed to get 
broadcast piece  (was: Race Condition when reading Broadcast shuffle input)

> Race Condition when reading Broadcast shuffle input. Failed to get broadcast 
> piece
> --
>
> Key: SPARK-22823
> URL: https://issues.apache.org/jira/browse/SPARK-22823
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.1, 2.2.1
>Reporter: Dmitrii Bundin
>Priority: Minor
>
> It seems we have a race condition when trying to read shuffle input which is 
> a broadcast, not direct. To read broadcast MapStatuses at 
> {code:java}
> org.apache.spark.shuffle.BlockStoreShuffleReader::read()
> {code}
> we submit a message of the type GetMapOutputStatuses(shuffleId) to be 
> executed in MapOutputTrackerMaster's pool which in turn ends up in creating a 
> new broadcast in
> {code:java}
> org.apache.spark.MapOutputTracker::serializeMapStatuses
> {code}
> if the received statuses bytes more than minBroadcastSize.
> So registering the newly created broadcast in the driver's 
> BlockManagerMasterEndpoint may appear later than some executor asks for the 
> broadcast piece location from the driver.
> In out project we get the following exception on the regular basis:
> {code:java}
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
> at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
> at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
> at 
> org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
> at 
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)
> {code}
> This exception is appeared when we try to read a broadcast piece. To do this 
> we need to fetch the broadcast piece location from the driver 
> {code:java}
> org.apach

[jira] [Updated] (SPARK-22823) Race Condition when reading Broadcast shuffle input

2017-12-18 Thread Dmitrii Bundin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitrii Bundin updated SPARK-22823:
---
Component/s: Spark Core

> Race Condition when reading Broadcast shuffle input
> ---
>
> Key: SPARK-22823
> URL: https://issues.apache.org/jira/browse/SPARK-22823
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.1, 2.2.1
>Reporter: Dmitrii Bundin
>Priority: Minor
>
> It seems we have a race condition when trying to read shuffle input which is 
> a broadcast, not direct. To read broadcast MapStatuses at 
> {code:java}
> org.apache.spark.shuffle.BlockStoreShuffleReader::read()
> {code}
> we submit a message of the type GetMapOutputStatuses(shuffleId) to be 
> executed in MapOutputTrackerMaster's pool which in turn ends up in creating a 
> new broadcast in
> {code:java}
> org.apache.spark.MapOutputTracker::serializeMapStatuses
> {code}
> if the received statuses bytes more than minBroadcastSize.
> So registering the newly created broadcast in the driver's 
> BlockManagerMasterEndpoint may appear later than some executor asks for the 
> broadcast piece location from the driver.
> In out project we get the following exception on the regular basis:
> {code:java}
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
> at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
> at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
> at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
> at 
> org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
> at 
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get 
> broadcast_176_piece0 of broadcast_176
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)
> {code}
> This exception is appeared when we try to read a broadcast piece. To do this 
> we need to fetch the broadcast piece location from the driver 
> {code:java}
> org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
> {code}
> . The driver responses with empty list of locations and the fecthing of 
> broadcast piece failed with the exception

[jira] [Updated] (SPARK-22823) Race Condition when reading Broadcast shuffle input

2017-12-18 Thread Dmitrii Bundin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitrii Bundin updated SPARK-22823:
---
Description: 
It seems we have a race condition when trying to read shuffle input which is a 
broadcast, not direct. To read broadcast MapStatuses at 
{code:java}
org.apache.spark.shuffle.BlockStoreShuffleReader::read()
{code}

we submit a message of the type GetMapOutputStatuses(shuffleId) to be executed 
in MapOutputTrackerMaster's pool which in turn ends up in creating a new 
broadcast in

{code:java}
org.apache.spark.MapOutputTracker::serializeMapStatuses
{code}

if the received statuses bytes more than minBroadcastSize.

So registering the newly created broadcast in the driver's 
BlockManagerMasterEndpoint may appear later than some executor asks for the 
broadcast piece location from the driver.

In out project we get the following exception on the regular basis:

{code:java}
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_176_piece0 of broadcast_176
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
at 
org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
at 
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_176_piece0 
of broadcast_176
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)

{code}

This exception is appeared when we try to read a broadcast piece. To do this we 
need to fetch the broadcast piece location from the driver 
{code:java}
org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
{code}
. The driver responses with empty list of locations and the fecthing of 
broadcast piece failed with the exception listed above. 

  was:
It seems we have a race condition when trying to read shuffle input which is a 
broadcast, not direct. To read broadcast MapStatuses at 
{code:java}
org.apache.spark.shuffle.BlockStoreShuffleReader::read()
{code}

we submit a message of the type GetMapOutputStatuses(shuffleId) to be executed 
in MapOutputTrackerMaster's pool which in turn ends up in creating a new 
broadcast in

{code:java}
org.apache.spark.MapOutputTracker::serializeMapStatuses
{code}

if the receive statuses bytes more than minBroadcastSize.

So registering the newly created broadcast in t