Re: Broadcast variable of size 1 GB fails with negative memory exception
Hi Imran, Thanks to you and Shivaram for looking into this, and opening the JIRA/PR. I will update you once the PR is merged if there are any other problems that arise from the broadcast. Mike On 7/29/15, Imran Rashid wrote: > Hi Mike, > > I dug into this a little more, and it turns out in this case there is a > pretty trivial fix -- the problem you are seeing is just from integer > overflow before casting to a long in SizeEstimator. I've opened > https://issues.apache.org/jira/browse/SPARK-9437 for this. > > For now, I think your workaround is to break into multiple arrays, and > broadcast them each separately. If its any consolation, you would have to > do this anyway if you tried to broadcast more than Int.MAX_VALUE doubles in > any case. The JVM only allows arrays up to that length. So even if spark > didn't have this limitation, you could go up to an array of 2^31 doubles, > which would be 16GB, before having to break into multiple arrays. > > There *are* a number of places in spark where things are limited to 2GB, > and there are a handful of open issues to deal with it. However I think > this one is pretty easy to fix (I must have looked at this a half-dozen > times before and never realized the fix was so simple before ...). However > it could be we'll run into something else after this particular issue w/ > SizeEstimator is fixed. This certainly won't work with HttpBroadcast, but > I think it might just work as long as you stick with TorrentBroadcast. > > imran > > On Tue, Jul 28, 2015 at 10:56 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Hi Imran, >> >> Thanks for your reply. I have double-checked the code I ran to >> generate an nxn matrix and nx1 vector for n = 2^27. There was >> unfortunately a bug in it, where instead of having typed 134,217,728 >> for n = 2^27, I included a third '7' by mistake, making the size 10x >> larger. >> >> However, even after having corrected this, my question about >> broadcasting is still whether or not a variable >= 2G in size may be >> transferred? In this case, for n >= 2^28, the broadcast variable >> crashes, and an array of size MAX_INT cannot be broadcast. >> >> Looking at Chowdhury's "Performance and Scalability of Broadcast in >> Spark" technical report, I realize that the results are reported only >> for broadcast variables up to 1 GB in physical size. I was hoping, >> however, that an Array of size MAX_INT would be transferrable via a >> broadcast (since the previous PR I mentioned seems to have added >> support for > 2GB variables) such that the matrix-vector >> multiplication would scale to MAX_INT x MAX_INT matrices with a >> broadcast variable. >> >> Would you or anyone on the dev list be able to comment on whether this >> is possible? Since the (corrected) overflow I'm seeing is for > 2^31 >> physical bytes being transferred, I am guessing that there is still a >> physical limitation on how many bytes may be sent via broadcasting, at >> least for a primitive Array[Double]? >> >> Thanks, >> Mike >> >> 19176&INFO&IndexedRowMatrix&Broadcasting vecArray with size 268435456& >> 19177&INFO&MemoryStore&ensureFreeSpace(-2147483592) called with >> curMem=6888, maxMem=92610625536& >> 19177&INFO&MemoryStore&Block broadcast_2 stored as values in memory >> (estimated size -2147483592.0 B, free 88.3 GB)& >> Exception in thread "main" java.lang.IllegalArgumentException: >> requirement failed: sizeInBytes was negative: -2147483592 >> >> On 7/28/15, Imran Rashid wrote: >> > Hi Mike, >> > >> > are you sure there the size isn't off 2x somehow? I just tried to >> > reproduce with a simple test in BlockManagerSuite: >> > >> > test("large block") { >> > store = makeBlockManager(4e9.toLong) >> > val arr = new Array[Double](1 << 28) >> > println(arr.size) >> > val blockId = BlockId("rdd_3_10") >> > val result = store.putIterator(blockId, Iterator(arr), >> > StorageLevel.MEMORY_AND_DISK) >> > result.foreach{println} >> > } >> > >> > it fails at 1 << 28 with nearly the same message, but its fine for (1 >> > << >> > 28) - 1 with a reported block size of 2147483680. Not exactly the same >> as >> > what you did, but I expect it to be close enough to exhibit the same >> error. >> > >> > >> > On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote: >> >> >> >> Hello Devs, >> >> >> >> I am investigating how matrix vector multiplication can scale for an >> >> IndexedRowMatrix in mllib.linalg.distributed. >> >> >> >> Currently, I am broadcasting the vector to be multiplied on the right. >> >> The IndexedRowMatrix is stored across a cluster with up to 16 nodes, >> >> each with >200 GB of memory. The spark driver is on an identical node, >> >> having more than 200 Gb of memory. >> >> >> >> In scaling n, the size of the vector to be broadcast, I find that the >> >> maximum size of n that I can use is 2^26. For 2^27, the broadcast will >> >> fail. The array being broadcast is of type Array[Double], so the >> >> contents have size 2^30 bytes, which
Re: Broadcast variable of size 1 GB fails with negative memory exception
Hi Mike, I dug into this a little more, and it turns out in this case there is a pretty trivial fix -- the problem you are seeing is just from integer overflow before casting to a long in SizeEstimator. I've opened https://issues.apache.org/jira/browse/SPARK-9437 for this. For now, I think your workaround is to break into multiple arrays, and broadcast them each separately. If its any consolation, you would have to do this anyway if you tried to broadcast more than Int.MAX_VALUE doubles in any case. The JVM only allows arrays up to that length. So even if spark didn't have this limitation, you could go up to an array of 2^31 doubles, which would be 16GB, before having to break into multiple arrays. There *are* a number of places in spark where things are limited to 2GB, and there are a handful of open issues to deal with it. However I think this one is pretty easy to fix (I must have looked at this a half-dozen times before and never realized the fix was so simple before ...). However it could be we'll run into something else after this particular issue w/ SizeEstimator is fixed. This certainly won't work with HttpBroadcast, but I think it might just work as long as you stick with TorrentBroadcast. imran On Tue, Jul 28, 2015 at 10:56 PM, Mike Hynes <91m...@gmail.com> wrote: > Hi Imran, > > Thanks for your reply. I have double-checked the code I ran to > generate an nxn matrix and nx1 vector for n = 2^27. There was > unfortunately a bug in it, where instead of having typed 134,217,728 > for n = 2^27, I included a third '7' by mistake, making the size 10x > larger. > > However, even after having corrected this, my question about > broadcasting is still whether or not a variable >= 2G in size may be > transferred? In this case, for n >= 2^28, the broadcast variable > crashes, and an array of size MAX_INT cannot be broadcast. > > Looking at Chowdhury's "Performance and Scalability of Broadcast in > Spark" technical report, I realize that the results are reported only > for broadcast variables up to 1 GB in physical size. I was hoping, > however, that an Array of size MAX_INT would be transferrable via a > broadcast (since the previous PR I mentioned seems to have added > support for > 2GB variables) such that the matrix-vector > multiplication would scale to MAX_INT x MAX_INT matrices with a > broadcast variable. > > Would you or anyone on the dev list be able to comment on whether this > is possible? Since the (corrected) overflow I'm seeing is for > 2^31 > physical bytes being transferred, I am guessing that there is still a > physical limitation on how many bytes may be sent via broadcasting, at > least for a primitive Array[Double]? > > Thanks, > Mike > > 19176&INFO&IndexedRowMatrix&Broadcasting vecArray with size 268435456& > 19177&INFO&MemoryStore&ensureFreeSpace(-2147483592) called with > curMem=6888, maxMem=92610625536& > 19177&INFO&MemoryStore&Block broadcast_2 stored as values in memory > (estimated size -2147483592.0 B, free 88.3 GB)& > Exception in thread "main" java.lang.IllegalArgumentException: > requirement failed: sizeInBytes was negative: -2147483592 > > On 7/28/15, Imran Rashid wrote: > > Hi Mike, > > > > are you sure there the size isn't off 2x somehow? I just tried to > > reproduce with a simple test in BlockManagerSuite: > > > > test("large block") { > > store = makeBlockManager(4e9.toLong) > > val arr = new Array[Double](1 << 28) > > println(arr.size) > > val blockId = BlockId("rdd_3_10") > > val result = store.putIterator(blockId, Iterator(arr), > > StorageLevel.MEMORY_AND_DISK) > > result.foreach{println} > > } > > > > it fails at 1 << 28 with nearly the same message, but its fine for (1 << > > 28) - 1 with a reported block size of 2147483680. Not exactly the same > as > > what you did, but I expect it to be close enough to exhibit the same > error. > > > > > > On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote: > >> > >> Hello Devs, > >> > >> I am investigating how matrix vector multiplication can scale for an > >> IndexedRowMatrix in mllib.linalg.distributed. > >> > >> Currently, I am broadcasting the vector to be multiplied on the right. > >> The IndexedRowMatrix is stored across a cluster with up to 16 nodes, > >> each with >200 GB of memory. The spark driver is on an identical node, > >> having more than 200 Gb of memory. > >> > >> In scaling n, the size of the vector to be broadcast, I find that the > >> maximum size of n that I can use is 2^26. For 2^27, the broadcast will > >> fail. The array being broadcast is of type Array[Double], so the > >> contents have size 2^30 bytes, which is approximately 1 (metric) GB. > >> > >> I have read in PR [SPARK-3721] [PySpark] "broadcast objects larger > >> than 2G" that this should be supported (I assume this works for scala, > >> as well?). However, when I increase n to 2^27 or above, the program > >> invariably crashes at the broadcast. > >> > >> The problem stems from the size of the result
Re: Broadcast variable of size 1 GB fails with negative memory exception
Hi Imran, Thanks for your reply. I have double-checked the code I ran to generate an nxn matrix and nx1 vector for n = 2^27. There was unfortunately a bug in it, where instead of having typed 134,217,728 for n = 2^27, I included a third '7' by mistake, making the size 10x larger. However, even after having corrected this, my question about broadcasting is still whether or not a variable >= 2G in size may be transferred? In this case, for n >= 2^28, the broadcast variable crashes, and an array of size MAX_INT cannot be broadcast. Looking at Chowdhury's "Performance and Scalability of Broadcast in Spark" technical report, I realize that the results are reported only for broadcast variables up to 1 GB in physical size. I was hoping, however, that an Array of size MAX_INT would be transferrable via a broadcast (since the previous PR I mentioned seems to have added support for > 2GB variables) such that the matrix-vector multiplication would scale to MAX_INT x MAX_INT matrices with a broadcast variable. Would you or anyone on the dev list be able to comment on whether this is possible? Since the (corrected) overflow I'm seeing is for > 2^31 physical bytes being transferred, I am guessing that there is still a physical limitation on how many bytes may be sent via broadcasting, at least for a primitive Array[Double]? Thanks, Mike 19176&INFO&IndexedRowMatrix&Broadcasting vecArray with size 268435456& 19177&INFO&MemoryStore&ensureFreeSpace(-2147483592) called with curMem=6888, maxMem=92610625536& 19177&INFO&MemoryStore&Block broadcast_2 stored as values in memory (estimated size -2147483592.0 B, free 88.3 GB)& Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: sizeInBytes was negative: -2147483592 On 7/28/15, Imran Rashid wrote: > Hi Mike, > > are you sure there the size isn't off 2x somehow? I just tried to > reproduce with a simple test in BlockManagerSuite: > > test("large block") { > store = makeBlockManager(4e9.toLong) > val arr = new Array[Double](1 << 28) > println(arr.size) > val blockId = BlockId("rdd_3_10") > val result = store.putIterator(blockId, Iterator(arr), > StorageLevel.MEMORY_AND_DISK) > result.foreach{println} > } > > it fails at 1 << 28 with nearly the same message, but its fine for (1 << > 28) - 1 with a reported block size of 2147483680. Not exactly the same as > what you did, but I expect it to be close enough to exhibit the same error. > > > On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote: >> >> Hello Devs, >> >> I am investigating how matrix vector multiplication can scale for an >> IndexedRowMatrix in mllib.linalg.distributed. >> >> Currently, I am broadcasting the vector to be multiplied on the right. >> The IndexedRowMatrix is stored across a cluster with up to 16 nodes, >> each with >200 GB of memory. The spark driver is on an identical node, >> having more than 200 Gb of memory. >> >> In scaling n, the size of the vector to be broadcast, I find that the >> maximum size of n that I can use is 2^26. For 2^27, the broadcast will >> fail. The array being broadcast is of type Array[Double], so the >> contents have size 2^30 bytes, which is approximately 1 (metric) GB. >> >> I have read in PR [SPARK-3721] [PySpark] "broadcast objects larger >> than 2G" that this should be supported (I assume this works for scala, >> as well?). However, when I increase n to 2^27 or above, the program >> invariably crashes at the broadcast. >> >> The problem stems from the size of the result block to be sent in >> BlockInfo.scala; the size is reportedly negative. An example error log >> is shown below. >> >> If anyone has more experience or knowledge of why this broadcast is >> failing, I'd appreciate the input. >> -- >> Thanks, >> Mike >> >> 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with >> curMem=0, maxMem=92610625536: >> 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory >> (estimated size -2147480008.0 B, free 88.3 GB): >> Exception in thread "main" java.lang.IllegalArgumentException: >> requirement failed: sizeInBytes was negative: -2147480008 >> at scala.Predef$.require(Predef.scala:233) >> at > org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55) >> at > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815) >> at > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >> at > org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996) >> at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99) >> at > org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85) >> at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) >> at > org.apache.spark.SparkContext.broadcast(SparkContext.s
Re: Broadcast variable of size 1 GB fails with negative memory exception
Hi Mike, are you sure there the size isn't off 2x somehow? I just tried to reproduce with a simple test in BlockManagerSuite: test("large block") { store = makeBlockManager(4e9.toLong) val arr = new Array[Double](1 << 28) println(arr.size) val blockId = BlockId("rdd_3_10") val result = store.putIterator(blockId, Iterator(arr), StorageLevel.MEMORY_AND_DISK) result.foreach{println} } it fails at 1 << 28 with nearly the same message, but its fine for (1 << 28) - 1 with a reported block size of 2147483680. Not exactly the same as what you did, but I expect it to be close enough to exhibit the same error. On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote: > > Hello Devs, > > I am investigating how matrix vector multiplication can scale for an > IndexedRowMatrix in mllib.linalg.distributed. > > Currently, I am broadcasting the vector to be multiplied on the right. > The IndexedRowMatrix is stored across a cluster with up to 16 nodes, > each with >200 GB of memory. The spark driver is on an identical node, > having more than 200 Gb of memory. > > In scaling n, the size of the vector to be broadcast, I find that the > maximum size of n that I can use is 2^26. For 2^27, the broadcast will > fail. The array being broadcast is of type Array[Double], so the > contents have size 2^30 bytes, which is approximately 1 (metric) GB. > > I have read in PR [SPARK-3721] [PySpark] "broadcast objects larger > than 2G" that this should be supported (I assume this works for scala, > as well?). However, when I increase n to 2^27 or above, the program > invariably crashes at the broadcast. > > The problem stems from the size of the result block to be sent in > BlockInfo.scala; the size is reportedly negative. An example error log > is shown below. > > If anyone has more experience or knowledge of why this broadcast is > failing, I'd appreciate the input. > -- > Thanks, > Mike > > 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with > curMem=0, maxMem=92610625536: > 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory > (estimated size -2147480008.0 B, free 88.3 GB): > Exception in thread "main" java.lang.IllegalArgumentException: > requirement failed: sizeInBytes was negative: -2147480008 > at scala.Predef$.require(Predef.scala:233) > at org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815) > at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) > at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996) > at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99) > at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85) > at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) > at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297) > at org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184) > at himrod.linalg.KrylovTests$.main(KrylovTests.scala:172) > at himrod.linalg.KrylovTests.main(KrylovTests.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:666) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:118) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org >