[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14603223#comment-14603223 ] Thomas Graves commented on SPARK-1391: -- duping this to SPARK-6235. If something is missing lets add it there. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: BlockLimitDesign.pdf, SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.ap
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14344320#comment-14344320 ] Reynold Xin commented on SPARK-1391: - I absolutely agree that better error messages is critical. - I'm saying it is rare that somebody wants to cache > 2gb partition and also use replication. - We can fix TorrentBroadcast without fixing block upload, because TorrentBroadcast splits data into chunks anyway. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: BlockLimitDesign.pdf, SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIR
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14344286#comment-14344286 ] Imran Rashid commented on SPARK-1391: - [~rxin] Sure thing, I can break it into multiple pieces. though honestly, if we don't think we'll bother fixing some of these 2gb limits, step 0 should be putting in sane error messages on all the different cases where you can run into the 2gb limit. Right now the errors are extremely confusing. If we want to support only some very limited set of functionality, then we might not even need to have LargeByteBuffer interact at all w/ ManagedBuffer -- if we only want to support cached partitions, with no replication and no remote fetches, then I'm pretty sure the relevant paths in BlockManager never involve a ManagedBuffer. I'm not sure I understand what you mean that "Uploading blocks > 2G is extremely rare". Are you saying that nobody would want to cache a partition > 2gb? That nobody uses replication when caching? Or that we don't need to support the combination? Also if you have a broadcast variable over 2gb, TorrentBroadcast will store it in all one block on the driver. It breaks it into smaller blocks when sending it executors, but not on the driver -- perhaps that could be changed to always break into smaller blocks on the driver as well. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: BlockLimitDesign.pdf, SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessa
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343900#comment-14343900 ] Reynold Xin commented on SPARK-1391: @squito if you want to attempt something this large and core to the whole engine, it would be better to do this incrementally, especially when this is a part of the code that you are less familiar with. I'd suggest breaking this task down in the following way, and getting feedback incrementally as well. 1. LargeByteBuffer interface. (This alone would deserve its own design doc and focus on how it integrates with ManagedBuffer) 2. Block storage 3. Block fetching 4. Block upload Uploading blocks > 2G is extremely rare, as it is rarely used outside of streaming, and streaming data blocks are usually small. It would also be much simpler to deal with upload if we change it to sending a msg to the other end and let the other end download the blocks instead. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: BlockLimitDesign.pdf, SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.Conne
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343817#comment-14343817 ] Apache Spark commented on SPARK-1391: - User 'squito' has created a pull request for this issue: https://github.com/apache/spark/pull/4857 > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: BlockLimitDesign.pdf, SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14338648#comment-14338648 ] Imran Rashid commented on SPARK-1391: - The one complication here comes from the network transfer required by replication. It we ignore the {{NioBlockTransferService}} for now and just look at {{NettyBlockTransferService}}, the existing behavior is: 1. replication results in a request to [{{NettyBlockTransferService#uploadBlocks}} | https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L126], which sends an {{UploadBlock}} msg to a peer. The {{UploadBlock}} message contains the full payload, which is limited to 2GB currently. 2. The message is received by [{{NettyBlockRpcServer}} | https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala#L62] where it is simply deserialized and inserted into the local block manager. I'm thinking we could break a block apart into multiple messages, eg. {{UploadPartialBlock}}, with each message limited to 2GB (or even less). Then {{NettyBlockRpcServer}} would queue up all the messages, and once it had received them all it would put them together and insert the block locally. My concern with that approach is robustness -- what happens if some of the {{UploadPartialBlock}} s never make it, for whatever reason? We wouldn't want {{NettyBlockRpcServer}} to simply hold on to those partial msgs in memory indefinitely. Does it make sense to introduce a timeout? When the first {{UploadPartialBlock}} is received, it would only wait for the rest of the msgs a limited time before dumping those partial blocks. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at sca
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329613#comment-14329613 ] Imran Rashid commented on SPARK-1391: - Here is a minimal program to demonstrate the problem: {code} sc.parallelize(1 to 1e6.toInt, 1).map{i => new Array[Byte](2.2e3.toInt)}.persist(StorageLevel.DISK_ONLY).count() {code} this only demonstrates the problem w/ {{DiskStore}} but a solution to this should apply to other cases if done correctly. (probably need to come up with more test cases) > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329605#comment-14329605 ] Imran Rashid commented on SPARK-1391: - [~coderplay], I assume you are no longer looking at this, right? I'm going to take a crack at this issue if you don't mind. Here is my plan, copied from SPARK-1476 (now that I've untangled those issues a little bit): I'd like to start on it, with the following very minimal goals: 1. Make it possible for blocks to be bigger than 2GB 2. Maintain performance on smaller blocks ie., I'm not going to try to do anything fancy to optimize performance of the large blocks. To that end, my plan is to 1. create a {{LargeByteBuffer}} interface, which just has the same methods we use on {{ByteBuffer}} 2. have one implementation that just wraps one ByteBuffer, and another which wraps a completely static set of {{ByteBuffer}} s (eg., if you map a 3 GB file, it'll just immediately map it to 2 {{ByteBuffer}} s, nothing fancy with only mapping the first half of the file until the second is needed etc. etc.) 3. change {{ByteBuffer}} to {{LargeByteBuffer}} in {{BlockStore}} I see that about a year back there was a lot of discussion on this in SPARK-1476, and some alternate proposals. I'd like to push forward with a POC to try to move the discussion along again. I know there was some discussion about how important this is, and whether or not we want to support it. IMO this is a big limitation and results in a lot of frustration for the users, we really need a solution for this. I could still be missing something, but I believe this should also solve SPARK-3151 > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.It
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14160356#comment-14160356 ] Gilberto Tin commented on SPARK-1391: - I am having the same issue spark 1.1.0. 6 node cluster testing small file with 1.3GB size before moving to bigger cluster bigger files. It fails on flatmap operation. 14/10/06 14:31:25 ERROR storage.BlockManagerWorker: Exception handling buffer message java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:379) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockM
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14044427#comment-14044427 ] Frank Dai commented on SPARK-1391: -- [~coderplay] Have you solved this issue yet ? Or do you have any temporary solution to this problem? > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13966332#comment-13966332 ] Mridul Muralidharan commented on SPARK-1391: Another place where this is relevant is here : java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39) at org.apache.spark.rdd.RDD.iterator(RDD.scala:233) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) So we might want to change the abstraction from single ByteBuffer to a sequence of bytebuffers ... > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13964361#comment-13964361 ] Shivaram Venkataraman commented on SPARK-1391: -- I tried to run with this patch yesterday, but unfortunately I dont think the non-local jobs were triggered during my run. I will try to synthetically force non-local tasks the next time around to verify this. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13962563#comment-13962563 ] Shivaram Venkataraman commented on SPARK-1391: -- Sorry didn't get a chance to try this yet. Will try to do it tomorrow > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13962319#comment-13962319 ] Min Zhou commented on SPARK-1391: - Any update on your test , [~shivaram] ? > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13960100#comment-13960100 ] Shivaram Venkataraman commented on SPARK-1391: -- Thanks for the patch. I will try this out in the next couple of days and get back. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > Attachments: SPARK-1391.diff > > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > {noformat} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13959127#comment-13959127 ] Min Zhou commented on SPARK-1391: - Yes. Communication layer use ByteBuffer array to transfer messages, but the receiver will convert them back to BlockMessages where each block corresponding to one ByteBuffer, which can't be larger than 2GB. Those BlockMessages will be consumed by the connection caller in everywhere we can't control. One approach is write an CompositeByteBuffer to overcome the 2GB limitation, but still can't break some other limitation of ByteBuffer interface, like ByteBuffer.position(), ByteBuffer.capacity(), ByteBuffer.remaining(), whose return values are still integers. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPool
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13959067#comment-13959067 ] Reynold Xin commented on SPARK-1391: I took a quick look into this. We are using a bunch of ByteBuffer's throughout the block manager and the communication layer. We need to replace that ByteBuffer with a different interface that can handle larger arrays. It is fortunate that the underlying communication in Connection.scala actually breaks messages down into smaller trunks, so that's one less place to change. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman >Assignee: Min Zhou > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > {noformat} > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13958036#comment-13958036 ] Min Zhou commented on SPARK-1391: - [~shivaram] Yeah, I meant the fastutils involved by spark. From your revision, it's weird that the version is 6.4.4... > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13958014#comment-13958014 ] Shivaram Venkataraman commented on SPARK-1391: -- Oh and yes, I'd be happy to test out any patch / WIP > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13958013#comment-13958013 ] Shivaram Venkataraman commented on SPARK-1391: -- I am not using any fastutil version explicitly. I am just using Spark's master branch from around March 23rd. (The exact commit I am synced to is https://github.com/apache/spark/commit/8265dc7739caccc59bc2456b2df055ca96337fe4) > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13958003#comment-13958003 ] Min Zhou commented on SPARK-1391: - Thank you guys. [~shivaram] Which fastutils version did you use? Is it 6.5.7 as I analyzed ? I will submit a quick and dirty patch these days, not expect to commit it. Just prove my analysis above in your real world cuz I have no spark deployment here to test. Shivaram, would you like to test for me? Thanks in advance. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957957#comment-13957957 ] Shivaram Venkataraman commented on SPARK-1391: -- Yeah I think the right solution is to not create one large array, but somehow either stream the data or break it up into smaller chunks > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957953#comment-13957953 ] Shivaram Venkataraman commented on SPARK-1391: -- [~coderplay] That sounds great to me -- I don't seem to have permissions to assign issues though. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957949#comment-13957949 ] Sean Owen commented on SPARK-1391: -- Of course! Hardly my issue. Well, you could try my patch that replaces fastutil with alternatives. I doubt the standard ByteArrayOutputStream does differently though? But we are always going to have a problem in that a Java byte array can only be so big because of the size of an int, regardless of stream position issues. This one could be deeper. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957939#comment-13957939 ] Min Zhou commented on SPARK-1391: - [~shivaram] [~sowen], if you don't mind, I will take this issue. Actually, I was invited by Reynold to solve this problem. IMHO, fastutils at least can't cover the field "position" be overflow. Maybe my solution is like yours, to replace fastutils with other libs, then we will have some collaboration. May I? > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957889#comment-13957889 ] Sean Owen commented on SPARK-1391: -- No fastutil is nothing to do with it, if in fact the problem is int overflow in the offset. It's basically a limit of how big an array can be in Java. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957844#comment-13957844 ] Shivaram Venkataraman commented on SPARK-1391: -- I just copied the stack trace from the stderr of the executor, but this was towards the end of a long job, so JIT could definitely be an explanation. In case this motivates somebody to pick up this issue :), let me add some context. This is really painful in situations where you have a long chain of transformations cached in memory. Any non-local task launched after that will try to recompute the entire transformation chain as block transfers fail. In terms of solutions, is fastutil the problem here ? Sean, will https://github.com/apache/spark/pull/266 address this by any chance ? > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadP
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957523#comment-13957523 ] Min Zhou commented on SPARK-1391: - The JIT intrinsic will replace the original JNI implemetation of System.arraycopy. Hotspot jvm can't get the symbol of this native method, this is the reason why that stack strace missed. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957522#comment-13957522 ] Min Zhou commented on SPARK-1391: - Finally, I can explain the whole thing. Apologies to Shivaram, you didn't miss anything. >From the above, we can see that the position can be a negative value. Run below code under fastutils 6.5.7 we will get {noformat} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; public class ArrayIndex { public static void main(String[] args) throws Exception { FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096); outputStream.position(Integer.MAX_VALUE); outputStream.write(new byte[1024], 0, 1024); outputStream.close(); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at java.lang.System.arraycopy(Native Method) at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29) {noformat} We can see the line number FastByteArrayOutputStream.java:96 is correct, the same as Shivaram's. The only different is that the stack frame "at java.lang.System.arraycopy(Native Method)" does exists in Shivaram's report. After some investigation on jdk source code, I get the answer, System.arraycopy got Just-in-Time compiled. {noformat} public class ArrayCopy { public static void main(String[] args) throws Exception { byte[] src = new byte[8]; byte[] dst = new byte[8]; for(int i = 0; i < 10; i++) { System.arraycopy(src,0,dst,0, dst.length); } System.arraycopy(src,0,dst,-1, dst.length); } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:36) } {noformat} See? That stack trace has gone. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at sca
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957492#comment-13957492 ] Min Zhou commented on SPARK-1391: - >From the line number, the fastutils version should be 6.5.7. The error should be thrown by this line in FastByteArrayOutputStream.java {noformat} System.arraycopy( b, off, array, position, len ); {noformat} > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957468#comment-13957468 ] Min Zhou commented on SPARK-1391: - [~sowen] Yes, neither of the two simulation can exactly match the stack info Shivaram gave us. So I suspected that if something missed there. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957450#comment-13957450 ] Sean Owen commented on SPARK-1391: -- Oops yes I mean offset of course. Good investigation there. I am also not sure why the index would not show. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957446#comment-13957446 ] Min Zhou commented on SPARK-1391: - [~sowen] Yes, there is possibility. It's not a line number, it's instead should have an index or Otherewise it should be a user defined exception or native exception. I greped the fastutil source code, it won't throw AIOOBEs with empty message. And the line number is not corresponding to the v6.4.4 see line 96 of FastByteArrayOutputStream: http://grepcode.com/file/repo1.maven.org/maven2/it.unimi.dsi/fastutil/6.4.4/it/unimi/dsi/fastutil/io/FastByteArrayOutputStream.java Here is the possibility where throws AIOOBEs. The position can be negative due to line {noformat} public void More ...write( final byte[] b, final int off, final int len ) throws IOException { ... if ( position + len > length ) length = position += len; ... } {noformat} Here is the simulation with fastutils under the version of 6.4.4 {noformat} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; public class ArrayOutofIndex { public static void main(String[] args) throws Exception { FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096); outputStream.position(-1); outputStream.write('a'); outputStream.close(); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:92) at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29) {noformat} {noformat} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; public class ArrayOutofIndex { public static void main(String[] args) throws Exception { FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096); outputStream.position(-1); outputStream.write(new byte[1024], 0, 1024); outputStream.close(); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at java.lang.System.arraycopy(Native Method) at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:98) at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29) {noformat} The line number and stack info is not the same as that be reported. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockM
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957437#comment-13957437 ] Sean Owen commented on SPARK-1391: -- It is possible that it constructs AIOOBE without the line number. I may be stating the obvious but it is all but surely negative and due to int overflow. 2GB strongly suggests it anyway. Would be good to confirm. > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957432#comment-13957432 ] Min Zhou commented on SPARK-1391: - It's supposed to have some detailed message after "java.lang.ArrayIndexOutOfBoundsException" like {noformat}java.lang.ArrayIndexOutOfBoundsException: -2147483648{noformat} > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13957417#comment-13957417 ] Min Zhou commented on SPARK-1391: - Hi Shivaram Is that the entire exception stack? > BlockManager cannot transfer blocks larger than 2G in size > -- > > Key: SPARK-1391 > URL: https://issues.apache.org/jira/browse/SPARK-1391 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 1.0.0 >Reporter: Shivaram Venkataraman > > If a task tries to remotely access a cached RDD block, I get an exception > when the block size is > 2G. The exception is pasted below. > Memory capacities are huge these days (> 60G), and many workflows depend on > having large blocks in memory, so it would be good to fix this bug. > I don't know if the same thing happens on shuffles if one transfer (from > mapper to reducer) is > 2G. > 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer > message > java.lang.ArrayIndexOutOfBoundsException > at > it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) > at > it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) > at > org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) > at > org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) > at > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)