Hi Imran,
Thanks to you and Shivaram for looking into this, and opening the
JIRA/PR. I will update you once the PR is merged if there are any
other problems that arise from the broadcast.
Mike

On 7/29/15, Imran Rashid <iras...@cloudera.com> wrote:
> Hi Mike,
>
> I dug into this a little more, and it turns out in this case there is a
> pretty trivial fix -- the problem you are seeing is just from integer
> overflow before casting to a long in SizeEstimator.  I've opened
> https://issues.apache.org/jira/browse/SPARK-9437 for this.
>
> For now, I think your workaround is to break into multiple arrays, and
> broadcast them each separately.  If its any consolation, you would have to
> do this anyway if you tried to broadcast more than Int.MAX_VALUE doubles in
> any case.  The JVM only allows arrays up to that length.  So even if spark
> didn't have this limitation, you could go up to an array of 2^31 doubles,
> which would be 16GB, before having to break into multiple arrays.
>
> There *are* a number of places in spark where things are limited to 2GB,
> and there are a handful of open issues to deal with it.  However I think
> this one is pretty easy to fix (I must have looked at this a half-dozen
> times before and never realized the fix was so simple before ...).  However
> it could be we'll run into something else after this particular issue w/
> SizeEstimator is fixed.  This certainly won't work with HttpBroadcast, but
> I think it might just work as long as you stick with TorrentBroadcast.
>
> imran
>
> On Tue, Jul 28, 2015 at 10:56 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Hi Imran,
>>
>> Thanks for your reply. I have double-checked the code I ran to
>> generate an nxn matrix and nx1 vector for n = 2^27. There was
>> unfortunately a bug in it, where instead of having typed 134,217,728
>> for n = 2^27, I included a third '7' by mistake, making the size 10x
>> larger.
>>
>> However, even after having corrected this, my question about
>> broadcasting is still whether or not a variable >= 2G in size may be
>> transferred? In this case, for n >= 2^28, the broadcast variable
>> crashes, and an array of size MAX_INT cannot be broadcast.
>>
>> Looking at Chowdhury's "Performance and Scalability of Broadcast in
>> Spark" technical report, I realize that the results are reported only
>> for broadcast variables up to 1 GB in physical size. I was hoping,
>> however, that an Array of size MAX_INT would be transferrable via a
>> broadcast (since the previous PR I mentioned seems to have added
>> support for > 2GB variables) such that the matrix-vector
>> multiplication would scale to MAX_INT x MAX_INT matrices with a
>> broadcast variable.
>>
>> Would you or anyone on the dev list be able to comment on whether this
>> is possible? Since the (corrected) overflow I'm seeing is for > 2^31
>> physical bytes being transferred, I am guessing that there is still a
>> physical limitation on how many bytes may be sent via broadcasting, at
>> least for a primitive Array[Double]?
>>
>> Thanks,
>> Mike
>>
>> 19176&INFO&IndexedRowMatrix&Broadcasting vecArray with size 268435456&
>> 19177&INFO&MemoryStore&ensureFreeSpace(-2147483592) called with
>> curMem=6888, maxMem=92610625536&
>> 19177&INFO&MemoryStore&Block broadcast_2 stored as values in memory
>> (estimated size -2147483592.0 B, free 88.3 GB)&
>> Exception in thread "main" java.lang.IllegalArgumentException:
>> requirement failed: sizeInBytes was negative: -2147483592
>>
>> On 7/28/15, Imran Rashid <iras...@cloudera.com> wrote:
>> > Hi Mike,
>> >
>> > are you sure there the size isn't off 2x somehow?  I just tried to
>> > reproduce with a simple test in BlockManagerSuite:
>> >
>> > test("large block") {
>> >   store = makeBlockManager(4e9.toLong)
>> >   val arr = new Array[Double](1 << 28)
>> >   println(arr.size)
>> >   val blockId = BlockId("rdd_3_10")
>> >   val result = store.putIterator(blockId, Iterator(arr),
>> > StorageLevel.MEMORY_AND_DISK)
>> >   result.foreach{println}
>> > }
>> >
>> > it fails at 1 << 28 with nearly the same message, but its fine for (1
>> > <<
>> > 28) - 1 with a reported block size of 2147483680.  Not exactly the same
>> as
>> > what you did, but I expect it to be close enough to exhibit the same
>> error.
>> >
>> >
>> > On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote:
>> >>
>> >> Hello Devs,
>> >>
>> >> I am investigating how matrix vector multiplication can scale for an
>> >> IndexedRowMatrix in mllib.linalg.distributed.
>> >>
>> >> Currently, I am broadcasting the vector to be multiplied on the right.
>> >> The IndexedRowMatrix is stored across a cluster with up to 16 nodes,
>> >> each with >200 GB of memory. The spark driver is on an identical node,
>> >> having more than 200 Gb of memory.
>> >>
>> >> In scaling n, the size of the vector to be broadcast, I find that the
>> >> maximum size of n that I can use is 2^26. For 2^27, the broadcast will
>> >> fail. The array being broadcast is of type Array[Double], so the
>> >> contents have size 2^30 bytes, which is approximately 1 (metric) GB.
>> >>
>> >> I have read in PR  [SPARK-3721] [PySpark] "broadcast objects larger
>> >> than 2G" that this should be supported (I assume this works for scala,
>> >> as well?). However, when I increase n to 2^27 or above, the program
>> >> invariably crashes at the broadcast.
>> >>
>> >> The problem stems from the size of the result block to be sent in
>> >> BlockInfo.scala; the size is reportedly negative. An example error log
>> >> is shown below.
>> >>
>> >> If anyone has more experience or knowledge of why this broadcast is
>> >> failing, I'd appreciate the input.
>> >> --
>> >> Thanks,
>> >> Mike
>> >>
>> >> 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with
>> >> curMem=0, maxMem=92610625536:
>> >> 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory
>> >> (estimated size -2147480008.0 B, free 88.3 GB):
>> >> Exception in thread "main" java.lang.IllegalArgumentException:
>> >> requirement failed: sizeInBytes was negative: -2147480008
>> >>         at scala.Predef$.require(Predef.scala:233)
>> >>         at
>> > org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55)
>> >>         at
>> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815)
>> >>         at
>> > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
>> >>         at
>> > org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996)
>> >>         at
>> >
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
>> >>         at
>> >
>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>> >>         at
>> >
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>> >>         at
>> >
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
>> >>         at
>> > org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297)
>> >>         at
>> >
>> org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184)
>> >>         at himrod.linalg.KrylovTests$.main(KrylovTests.scala:172)
>> >>         at himrod.linalg.KrylovTests.main(KrylovTests.scala)
>> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>         at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>         at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>         at java.lang.reflect.Method.invoke(Method.java:606)
>> >>         at
>> >
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:666)
>> >>         at
>> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178)
>> >>         at
>> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203)
>> >>         at
>> > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:118)
>> >>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: dev-h...@spark.apache.org
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to