Thanks for the answer Marcelo,

The goal is to keep an intermediate value per row in memory, which would
allow faster subsequent computations. I.e., computeSomething would depend
on the previous value from the previous computation.

If we replicate the entire row (other than the last changed element), then
the computational efficiency disappears, since that would trigger a copy
operation that results in O(p) per row, where p is the number of columns.

computeSomething would depend on O(1) per row in that it would only depend
on two columns and the intermediate value.

In fact, this type of scenario happens often in machine learning, where
keeping track of a smaller set of intermediate values from the previous
iteration can result in big efficiency boosts, for instance in Coordinate
Descent (where a large amount of efficiency comes from simple updates to
residuals).


On Fri, Apr 18, 2014 at 5:54 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> Hi Sung,
>
> On Fri, Apr 18, 2014 at 5:11 PM, Sung Hwan Chung
> <coded...@cs.stanford.edu> wrote:
> > while (true) {
> >   rdd.map((row : Array[Double]) => {
> >     row[numCols - 1] = computeSomething(row)
> >   }).reduce(...)
> > }
> >
> > If it fails at some point, I'd imagine that the intermediate info being
> > stored in row[numCols - 1] will be lost. And unless Spark runs this whole
> > thing from the very first iteration, things will get out of sync.
>
> I'm not sure I completely follow what you're trying to achieve here.
> But modifications to the "row" argument to the closure won't be seen
> anywhere outside that closure. What the reduce() step will see will be
> the output of the map step; in your code above, you're mapping the
> input to nothing.
>
> If you did this instead:
>
>    rdd.map((row : Array[Double]) => {
>      row(numCols - 1) = computeSomething(row)
>      row
>    }).reduce(...)
>
> Then you'd be mapping the input to an array which just happens to be
> the same as the input of the map function, with a modified item at
> "numCols - 1". Note there isn't any state being kept in "row"; the
> only "state" here is the output of the map function, which might be
> serialized and sent somewhere else to be processed by the closure
> passed to reduce().
>
> If there's an error while processing the map function, Spark will
> re-run it on a different worker node, and you should get the same
> output (given that the input of your RDD is the same).
>
> Also note that your example is always running exactly the same
> computation, since it's always using the same RDD. If you want the
> loop to do some sort of chaining, you'd need to create a new RDD from
> the results of the reduce, and then perform your map/reduce operations
> on that new RDD. And then the same rules as above would apply.
>
> Just to illustrate what I'm trying to say, try running this in a spark
> shell:
>
> var input = List(Array(1,2,3), Array(4,5,6))
> var rows = sc.parallelize(input)
>
> rows.map(row => {
>   row(1) = row(1) * 2
>   row
> }).reduce((a1, a2) => Array(a1(0) + a2(0), a1(1) + a2(1), a1(2) + a2(2)))
>
>
> And note how, after the computation runs, "input" still holds its
> original value, even though the map function modified its input.
>
>
> --
> Marcelo
>

Reply via email to