Hi Imran, Thanks to you and Shivaram for looking into this, and opening the JIRA/PR. I will update you once the PR is merged if there are any other problems that arise from the broadcast. Mike
On 7/29/15, Imran Rashid <iras...@cloudera.com> wrote: > Hi Mike, > > I dug into this a little more, and it turns out in this case there is a > pretty trivial fix -- the problem you are seeing is just from integer > overflow before casting to a long in SizeEstimator. I've opened > https://issues.apache.org/jira/browse/SPARK-9437 for this. > > For now, I think your workaround is to break into multiple arrays, and > broadcast them each separately. If its any consolation, you would have to > do this anyway if you tried to broadcast more than Int.MAX_VALUE doubles in > any case. The JVM only allows arrays up to that length. So even if spark > didn't have this limitation, you could go up to an array of 2^31 doubles, > which would be 16GB, before having to break into multiple arrays. > > There *are* a number of places in spark where things are limited to 2GB, > and there are a handful of open issues to deal with it. However I think > this one is pretty easy to fix (I must have looked at this a half-dozen > times before and never realized the fix was so simple before ...). However > it could be we'll run into something else after this particular issue w/ > SizeEstimator is fixed. This certainly won't work with HttpBroadcast, but > I think it might just work as long as you stick with TorrentBroadcast. > > imran > > On Tue, Jul 28, 2015 at 10:56 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Hi Imran, >> >> Thanks for your reply. I have double-checked the code I ran to >> generate an nxn matrix and nx1 vector for n = 2^27. There was >> unfortunately a bug in it, where instead of having typed 134,217,728 >> for n = 2^27, I included a third '7' by mistake, making the size 10x >> larger. >> >> However, even after having corrected this, my question about >> broadcasting is still whether or not a variable >= 2G in size may be >> transferred? In this case, for n >= 2^28, the broadcast variable >> crashes, and an array of size MAX_INT cannot be broadcast. >> >> Looking at Chowdhury's "Performance and Scalability of Broadcast in >> Spark" technical report, I realize that the results are reported only >> for broadcast variables up to 1 GB in physical size. I was hoping, >> however, that an Array of size MAX_INT would be transferrable via a >> broadcast (since the previous PR I mentioned seems to have added >> support for > 2GB variables) such that the matrix-vector >> multiplication would scale to MAX_INT x MAX_INT matrices with a >> broadcast variable. >> >> Would you or anyone on the dev list be able to comment on whether this >> is possible? Since the (corrected) overflow I'm seeing is for > 2^31 >> physical bytes being transferred, I am guessing that there is still a >> physical limitation on how many bytes may be sent via broadcasting, at >> least for a primitive Array[Double]? >> >> Thanks, >> Mike >> >> 19176&INFO&IndexedRowMatrix&Broadcasting vecArray with size 268435456& >> 19177&INFO&MemoryStore&ensureFreeSpace(-2147483592) called with >> curMem=6888, maxMem=92610625536& >> 19177&INFO&MemoryStore&Block broadcast_2 stored as values in memory >> (estimated size -2147483592.0 B, free 88.3 GB)& >> Exception in thread "main" java.lang.IllegalArgumentException: >> requirement failed: sizeInBytes was negative: -2147483592 >> >> On 7/28/15, Imran Rashid <iras...@cloudera.com> wrote: >> > Hi Mike, >> > >> > are you sure there the size isn't off 2x somehow? I just tried to >> > reproduce with a simple test in BlockManagerSuite: >> > >> > test("large block") { >> > store = makeBlockManager(4e9.toLong) >> > val arr = new Array[Double](1 << 28) >> > println(arr.size) >> > val blockId = BlockId("rdd_3_10") >> > val result = store.putIterator(blockId, Iterator(arr), >> > StorageLevel.MEMORY_AND_DISK) >> > result.foreach{println} >> > } >> > >> > it fails at 1 << 28 with nearly the same message, but its fine for (1 >> > << >> > 28) - 1 with a reported block size of 2147483680. Not exactly the same >> as >> > what you did, but I expect it to be close enough to exhibit the same >> error. >> > >> > >> > On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote: >> >> >> >> Hello Devs, >> >> >> >> I am investigating how matrix vector multiplication can scale for an >> >> IndexedRowMatrix in mllib.linalg.distributed. >> >> >> >> Currently, I am broadcasting the vector to be multiplied on the right. >> >> The IndexedRowMatrix is stored across a cluster with up to 16 nodes, >> >> each with >200 GB of memory. The spark driver is on an identical node, >> >> having more than 200 Gb of memory. >> >> >> >> In scaling n, the size of the vector to be broadcast, I find that the >> >> maximum size of n that I can use is 2^26. For 2^27, the broadcast will >> >> fail. The array being broadcast is of type Array[Double], so the >> >> contents have size 2^30 bytes, which is approximately 1 (metric) GB. >> >> >> >> I have read in PR [SPARK-3721] [PySpark] "broadcast objects larger >> >> than 2G" that this should be supported (I assume this works for scala, >> >> as well?). However, when I increase n to 2^27 or above, the program >> >> invariably crashes at the broadcast. >> >> >> >> The problem stems from the size of the result block to be sent in >> >> BlockInfo.scala; the size is reportedly negative. An example error log >> >> is shown below. >> >> >> >> If anyone has more experience or knowledge of why this broadcast is >> >> failing, I'd appreciate the input. >> >> -- >> >> Thanks, >> >> Mike >> >> >> >> 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with >> >> curMem=0, maxMem=92610625536: >> >> 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory >> >> (estimated size -2147480008.0 B, free 88.3 GB): >> >> Exception in thread "main" java.lang.IllegalArgumentException: >> >> requirement failed: sizeInBytes was negative: -2147480008 >> >> at scala.Predef$.require(Predef.scala:233) >> >> at >> > org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55) >> >> at >> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815) >> >> at >> > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >> >> at >> > org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996) >> >> at >> > >> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99) >> >> at >> > >> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) >> >> at >> > >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> >> at >> > >> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) >> >> at >> > org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297) >> >> at >> > >> org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184) >> >> at himrod.linalg.KrylovTests$.main(KrylovTests.scala:172) >> >> at himrod.linalg.KrylovTests.main(KrylovTests.scala) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> at >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:606) >> >> at >> > >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:666) >> >> at >> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178) >> >> at >> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203) >> >> at >> > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:118) >> >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: dev-h...@spark.apache.org >> >> >> > >> >> >> -- >> Thanks, >> Mike >> > -- Thanks, Mike --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org