Hi Mike,

I dug into this a little more, and it turns out in this case there is a
pretty trivial fix -- the problem you are seeing is just from integer
overflow before casting to a long in SizeEstimator.  I've opened
https://issues.apache.org/jira/browse/SPARK-9437 for this.

For now, I think your workaround is to break into multiple arrays, and
broadcast them each separately.  If its any consolation, you would have to
do this anyway if you tried to broadcast more than Int.MAX_VALUE doubles in
any case.  The JVM only allows arrays up to that length.  So even if spark
didn't have this limitation, you could go up to an array of 2^31 doubles,
which would be 16GB, before having to break into multiple arrays.

There *are* a number of places in spark where things are limited to 2GB,
and there are a handful of open issues to deal with it.  However I think
this one is pretty easy to fix (I must have looked at this a half-dozen
times before and never realized the fix was so simple before ...).  However
it could be we'll run into something else after this particular issue w/
SizeEstimator is fixed.  This certainly won't work with HttpBroadcast, but
I think it might just work as long as you stick with TorrentBroadcast.

imran

On Tue, Jul 28, 2015 at 10:56 PM, Mike Hynes <91m...@gmail.com> wrote:

> Hi Imran,
>
> Thanks for your reply. I have double-checked the code I ran to
> generate an nxn matrix and nx1 vector for n = 2^27. There was
> unfortunately a bug in it, where instead of having typed 134,217,728
> for n = 2^27, I included a third '7' by mistake, making the size 10x
> larger.
>
> However, even after having corrected this, my question about
> broadcasting is still whether or not a variable >= 2G in size may be
> transferred? In this case, for n >= 2^28, the broadcast variable
> crashes, and an array of size MAX_INT cannot be broadcast.
>
> Looking at Chowdhury's "Performance and Scalability of Broadcast in
> Spark" technical report, I realize that the results are reported only
> for broadcast variables up to 1 GB in physical size. I was hoping,
> however, that an Array of size MAX_INT would be transferrable via a
> broadcast (since the previous PR I mentioned seems to have added
> support for > 2GB variables) such that the matrix-vector
> multiplication would scale to MAX_INT x MAX_INT matrices with a
> broadcast variable.
>
> Would you or anyone on the dev list be able to comment on whether this
> is possible? Since the (corrected) overflow I'm seeing is for > 2^31
> physical bytes being transferred, I am guessing that there is still a
> physical limitation on how many bytes may be sent via broadcasting, at
> least for a primitive Array[Double]?
>
> Thanks,
> Mike
>
> 19176&INFO&IndexedRowMatrix&Broadcasting vecArray with size 268435456&
> 19177&INFO&MemoryStore&ensureFreeSpace(-2147483592) called with
> curMem=6888, maxMem=92610625536&
> 19177&INFO&MemoryStore&Block broadcast_2 stored as values in memory
> (estimated size -2147483592.0 B, free 88.3 GB)&
> Exception in thread "main" java.lang.IllegalArgumentException:
> requirement failed: sizeInBytes was negative: -2147483592
>
> On 7/28/15, Imran Rashid <iras...@cloudera.com> wrote:
> > Hi Mike,
> >
> > are you sure there the size isn't off 2x somehow?  I just tried to
> > reproduce with a simple test in BlockManagerSuite:
> >
> > test("large block") {
> >   store = makeBlockManager(4e9.toLong)
> >   val arr = new Array[Double](1 << 28)
> >   println(arr.size)
> >   val blockId = BlockId("rdd_3_10")
> >   val result = store.putIterator(blockId, Iterator(arr),
> > StorageLevel.MEMORY_AND_DISK)
> >   result.foreach{println}
> > }
> >
> > it fails at 1 << 28 with nearly the same message, but its fine for (1 <<
> > 28) - 1 with a reported block size of 2147483680.  Not exactly the same
> as
> > what you did, but I expect it to be close enough to exhibit the same
> error.
> >
> >
> > On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes <91m...@gmail.com> wrote:
> >>
> >> Hello Devs,
> >>
> >> I am investigating how matrix vector multiplication can scale for an
> >> IndexedRowMatrix in mllib.linalg.distributed.
> >>
> >> Currently, I am broadcasting the vector to be multiplied on the right.
> >> The IndexedRowMatrix is stored across a cluster with up to 16 nodes,
> >> each with >200 GB of memory. The spark driver is on an identical node,
> >> having more than 200 Gb of memory.
> >>
> >> In scaling n, the size of the vector to be broadcast, I find that the
> >> maximum size of n that I can use is 2^26. For 2^27, the broadcast will
> >> fail. The array being broadcast is of type Array[Double], so the
> >> contents have size 2^30 bytes, which is approximately 1 (metric) GB.
> >>
> >> I have read in PR  [SPARK-3721] [PySpark] "broadcast objects larger
> >> than 2G" that this should be supported (I assume this works for scala,
> >> as well?). However, when I increase n to 2^27 or above, the program
> >> invariably crashes at the broadcast.
> >>
> >> The problem stems from the size of the result block to be sent in
> >> BlockInfo.scala; the size is reportedly negative. An example error log
> >> is shown below.
> >>
> >> If anyone has more experience or knowledge of why this broadcast is
> >> failing, I'd appreciate the input.
> >> --
> >> Thanks,
> >> Mike
> >>
> >> 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with
> >> curMem=0, maxMem=92610625536:
> >> 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory
> >> (estimated size -2147480008.0 B, free 88.3 GB):
> >> Exception in thread "main" java.lang.IllegalArgumentException:
> >> requirement failed: sizeInBytes was negative: -2147480008
> >>         at scala.Predef$.require(Predef.scala:233)
> >>         at
> > org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55)
> >>         at
> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815)
> >>         at
> > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
> >>         at
> > org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996)
> >>         at
> >
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> >>         at
> >
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
> >>         at
> >
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> >>         at
> >
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> >>         at
> > org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297)
> >>         at
> >
> org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184)
> >>         at himrod.linalg.KrylovTests$.main(KrylovTests.scala:172)
> >>         at himrod.linalg.KrylovTests.main(KrylovTests.scala)
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>         at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>         at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>         at java.lang.reflect.Method.invoke(Method.java:606)
> >>         at
> >
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:666)
> >>         at
> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178)
> >>         at
> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203)
> >>         at
> > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:118)
> >>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: dev-h...@spark.apache.org
> >>
> >
>
>
> --
> Thanks,
> Mike
>

Reply via email to