Aureliano,

The idea is that you would only work on one large chunk at a time, the size
of which is done optimal trade-off between as big as the cluster can run
simultaneously, but small enough that you don't run into single-machine
memory limits, e.g., on the driver or reducers. It would almost certainly
not need to be the entire 1 billion columns.

On the sliding window, I completely missed the fact that the windows are
overlapping. But you could still partition the problem at one more layer,
that is, each partition would be, say, 100 windows rather than 1 as I've
pseudo-coded. So yes there would be some data duplication at the boundaries
of these window groups, but it is quite minimal and of little engineering
concern (here, about 1% at any given time).

Finally and perhaps orthogonally, it could be interesting for you to
examine what exactly is going on with the very-long-compute, as there may
be opportunities for parallel speed-ups there.

Sent while mobile. Pls excuse typos etc.
On Dec 20, 2013 2:56 PM, "Aureliano Buendia" <buendia...@gmail.com> wrote:

>
>
>
> On Fri, Dec 20, 2013 at 10:34 PM, Christopher Nguyen <c...@adatao.com>wrote:
>
>> Aureliano, would something like this work? The red code is the only
>> place where you have to think parallel here.
>>
>>
>> while (thereIsStillDataToWorkOn) {
>>   bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
>> design variable
>>   bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
>> restructureIntoArraysOf5000Doubles(bigChunk)
>>   myRDD = sc
>>     .parallelize(bigChunkAsArraysOf5000Doubles, 1)
>>     .map(eachArrayOf5000Doubles =>
>> someVeryLongRunningTransformer(eachArrayOf5000Doubles))
>>     .collect()
>> }
>>
>>
>> Next, pardon me if this is too basic, but in case it is helpful: this
>> code first runs on a single machine, called a Driver, which must have
>> access to the source data.
>>
>
> Thanks for the clear explanation.
>
>
>> When we call parallelize(), Spark handles all the partitioning of the
>> data into the available Workers, including serializing each data partition
>> to the Workers, and collecting the results back in one place.
>>
>
> This would create nearly 1 billion RDD's. Is that ok?
>
>
>> There is no data duplication other than the Worker's copy of the data
>> from the Driver.
>>
>
> Each out of boundary 50 column window shares 2*(49 + 48 + 47 + ... + 1)
> between the sliding windows on left and right. All of these columns are
> sent over the network many times. isn;t that duplication of data transfer?
>
>
>>
>> This indeed does not take advantage of all of the other available Spark
>> goodnesses that others have correctly pointed out on this thread, such as
>> broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
>> partitions, etc. But it would be exactly the right thing to do if it best
>> fits your problem statement.
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia 
>> <buendia...@gmail.com>wrote:
>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <c...@adatao.com>wrote:
>>>
>>>> Aureliano, how would your production data be coming in and accessed?
>>>> It's possible that you can still think of that level as a serial operation
>>>> (outer loop, large chunks) first before worrying about parallelizing the
>>>> computation of the tiny chunks.
>>>>
>>>
>>> It's a batch processing of time series data. Perhaps a serial processing
>>> where each serial item is a set of parallel processes could be an option.
>>> Does spark have such option?
>>>
>>>
>>>>
>>>> And I'm reading that when you refer to "data duplication", you're
>>>> worried about that as a side-effect problem, not as a requirement, correct?
>>>>
>>>
>>> That's right. Data duplication is certainly not a requirement, we are
>>> not trying to avoid it, but if it's a side effect that leads to some
>>> considerable io overhead, it's not going to be good.
>>>
>>>
>>>> And if the former, I don't see that data duplication is a necessary
>>>> side effect. Unless I missed something in the thread, don't use broadcast.
>>>>
>>>
>>> I take it that overlapped partitions does not mean data duplication. I
>>> wasn't sure if partitions hold a copy, or a reference.
>>>
>>>
>>>>
>>>> Put another way, I see the scale of this challenge as far more
>>>> operational than logical (when squinted at from the right angle :)
>>>>
>>>>  --
>>>> Christopher T. Nguyen
>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>> linkedin.com/in/ctnguyen
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia <
>>>> buendia...@gmail.com> wrote:
>>>>
>>>>> Also over thinking is appreciated in this problem, as my production
>>>>> data is actually near 100 x 1000,000,000 and data duplication could get
>>>>> messy with this.
>>>>>
>>>>> Sorry about the initial misinformation, I was thinking about my
>>>>> development/test data.
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia <
>>>>> buendia...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <minnesota...@gmail.com>wrote:
>>>>>>
>>>>>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>>>>>> which would be a fraction of standard cluster.  But since overthinking 
>>>>>>> is a
>>>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask 
>>>>>>> for
>>>>>>> each window.  Now you only need to replicate data across the boundaries 
>>>>>>> of
>>>>>>> each partition of windows, rather than each window.
>>>>>>>
>>>>>>
>>>>>> How can this be written in spark scala?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen 
>>>>>>> <c...@adatao.com>wrote:
>>>>>>>
>>>>>>>> Are we over-thinking the problem here? Since the per-window compute
>>>>>>>> task is hugely expensive, stateless from window to window, and the 
>>>>>>>> original
>>>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is 
>>>>>>>> in
>>>>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>>>>>> reading that data loading and distribution are going to be a tiny 
>>>>>>>> fraction
>>>>>>>> of the overall compute time.
>>>>>>>>
>>>>>>>> If that's the case, it would make sense simply to start with a 1GB
>>>>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 
>>>>>>>> rows
>>>>>>>> of 5,000 doubles each, map them out to the workers and have them 
>>>>>>>> interpret
>>>>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They 
>>>>>>>> each
>>>>>>>> have a good fraction of several days to figure it out :)
>>>>>>>>
>>>>>>>> This would be a great load test for Spark's resiliency over
>>>>>>>> long-running computations.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Christopher T. Nguyen
>>>>>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>>>>>> linkedin.com/in/ctnguyen
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>>>>>> free...@adatao.com> wrote:
>>>>>>>>
>>>>>>>>> Hmm, I misread that you need a sliding window.
>>>>>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>>>>>> improve NLineInputFormat so that partitions will have a small 
>>>>>>>>> overlapping
>>>>>>>>> portion in this case the overlapping portion is 50 columns
>>>>>>>>> So let say the matrix is divided into overlapping partitions like
>>>>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … 
>>>>>>>>> then we
>>>>>>>>> can assign each partition to a mapper to do mapPartition on it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --------------------------------------------
>>>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>>>> www.adatao.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <
>>>>>>>>> free...@adatao.com> wrote:
>>>>>>>>>
>>>>>>>>> Here, Tom assumed that you have your big matrix already being
>>>>>>>>> loaded in one machine. Now if you want to distribute it to slave 
>>>>>>>>> nodes you
>>>>>>>>> will need to broadcast it. I would expect this broadcasting will be 
>>>>>>>>> done
>>>>>>>>> once at the beginning of your algorithm and the computation time will
>>>>>>>>> dominate the overall execution time.
>>>>>>>>>
>>>>>>>>> On the other hand, a better way to deal with huge matrix is to
>>>>>>>>> store the data in hdfs and load data into each slaves
>>>>>>>>> partition-by-partition. This is fundamental data processing pattern in
>>>>>>>>> Spark/Hadoop world.
>>>>>>>>> If you opt to do this, you will have to use suitable InputFormat
>>>>>>>>> to make sure each partition has the right amount of row that you want.
>>>>>>>>> For example if you are lucky each HDFS partition have exact n*50
>>>>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take 
>>>>>>>>> care of
>>>>>>>>> splitting n*50-row partition into n sub matrix
>>>>>>>>>
>>>>>>>>> However, HDFS TextInput or SequnceInputFormat format will not
>>>>>>>>> guarantee each partition has certain number of rows. What you want is
>>>>>>>>> NLineInputFormat, which I think currently has not been pulled into 
>>>>>>>>> Spark
>>>>>>>>> yet.
>>>>>>>>> If everyone think this is needed, I can implement it quickly, it
>>>>>>>>> should be pretty easy.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --------------------------------------------
>>>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>>>> www.adatao.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <
>>>>>>>>> buendia...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <minnesota...@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Oh, I see.  I was thinking that there was a computational
>>>>>>>>>> dependency on one window to the next.  If the computations are 
>>>>>>>>>> independent,
>>>>>>>>>> then I think Spark can help you out quite a bit.
>>>>>>>>>>
>>>>>>>>>> I think you would want an RDD where each element is a window of
>>>>>>>>>> your dense matrix.  I'm not aware of a way to distribute the windows 
>>>>>>>>>> of the
>>>>>>>>>> big matrix in a way that doesn't involve broadcasting the whole 
>>>>>>>>>> thing.  You
>>>>>>>>>> might have to tweak some config options, but I think it would work
>>>>>>>>>> straightaway.  I would initialize the data structure like this:
>>>>>>>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>>>>>>>> val distributedChunks = sc.parallelize(0 until
>>>>>>>>>> numWindows).mapPartitions(it => it.map(windowID => 
>>>>>>>>>> getWindow(matB.value,
>>>>>>>>>> windowID) ) )
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Here broadcast is used instead of calling parallelize on
>>>>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>>>>>>>> sharing a big data mean a big network io overhead comparing to calling
>>>>>>>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Then just apply your matrix ops as map on
>>>>>>>>>>
>>>>>>>>>> You maybe have your own tool for dense matrix ops, but I would
>>>>>>>>>> suggest Scala Breeze.  You'll have to use an old version of Breeze 
>>>>>>>>>> (current
>>>>>>>>>> builds are for 2.10).  Spark with Scala-2.10 is a little way off.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>>>>>>>> buendia...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <
>>>>>>>>>>> minnesota...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of
>>>>>>>>>>>> the matrix, you can index windows of the rows all you want, but 
>>>>>>>>>>>> you're
>>>>>>>>>>>> limited to 100 concurrent tasks.  You could use a column 
>>>>>>>>>>>> decomposition and
>>>>>>>>>>>> access subsets of the columns with a PartitionPruningRDD.  I have 
>>>>>>>>>>>> to say,
>>>>>>>>>>>> though, if you're doing dense matrix operations, they will be 100s 
>>>>>>>>>>>> of times
>>>>>>>>>>>> faster on a shared mem platform.  This particular matrix, at 800 
>>>>>>>>>>>> MB could
>>>>>>>>>>>> be a Breeze on a single node.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The computation for every submatrix is very expensive, it takes
>>>>>>>>>>> days on a single node. I was hoping this can be reduced to hours or 
>>>>>>>>>>> minutes
>>>>>>>>>>> with spark.
>>>>>>>>>>>
>>>>>>>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to