Re: How to map each line to (line number, line)?

2013-12-30 Thread Tom Vacek
You're making assumptions about the partitioning.


On Mon, Dec 30, 2013 at 1:18 PM, Aureliano Buendia wrote:

>
>
>
> On Mon, Dec 30, 2013 at 6:48 PM, Guillaume Pitel <
> guillaume.pi...@exensa.com> wrote:
>
>>
>>
>>
>> It won't last for long = for now my dataset are small enough,
>>> but I'll have to change it someday
>>>
>>
>>  How does it depend on the dataset size? Are you saying zipWithIndex is
>> slow for bigger datasets?
>>
>>
>> No, but for a multi-tens of billion elements dataset, you cannot fit your
>> elements in memory on a single host.
>>
>> So at some point, the solution I currently use (not the one I sent) :
>> dataset.collect().zipWithIndex just won't scale.
>>
>> Did you try the code I sent ? I think the sortBy is probably in the wrong
>> direction, so change it with -i instead of i
>>
>
> I'm confused why would need in memory sorting. We just use a loop like any
> other loops in spark. Why shouldn't this solve the problem?:
>
> val count = lines.count() // lines is the rdd
> val partitionLinesCount = count / rdd.partitions.length
> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>   var i = pi * partitionLinesCount
>   it.map {
> *line => (i, line)*
> i += 1
>   }
> }
>
>
>>
>> Guillaume
>>
>> --
>>[image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. 
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<>

Re: How to map each line to (line number, line)?

2013-12-30 Thread Tom Vacek
Yes, but a (partitionID, partitionIndex) tuple is a unique identifier
that's just as useful---and you can map that to unique line numbers at any
time.  myRdd.mapPartitionsWithIndex( (id, it) => it.zipWithIndex.map{case
(el, fID) => ( (id, fID), el) } )


On Mon, Dec 30, 2013 at 8:41 AM, Aureliano Buendia wrote:

> One thing could make this more complicated is partitioning.
>
>
> On Mon, Dec 30, 2013 at 12:28 PM, Aureliano Buendia 
> wrote:
>
>> Hi,
>>
>> When reading a simple text file in spark, what's the best way of mapping
>> each line to (line number, line)? RDD doesn't seem to have an equivalent of
>> zipWithIndex.
>>
>
>


Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Tom Vacek
OK, broadcasting is probably out.  Let's assume that you have n nodes with
k cores per node.  Each node will process k windows, most of which will be
in common (hopefully), so each node will need the range of columns in
[node_id*k, (node_id+1)*k+50-1], and your data over-replication will be
50*n.

This is where my knowledge of Spark is running out.  It is certainly
syntactically possible to spin up threads inside of a map, but whether
those will run in parallel (assuming there are available cores on the
node)...I'm not really sure.  It is simply a matter of configuration to
create free cores on each node.  Anyone?

After that, you'd need to get the data into an RDD in the right form.  I
think Michael has the right idea, but the first question needs to be
answered.  You could also get really creative and put the matrix in a web
server and have each task download the columns it needs.





On Fri, Dec 20, 2013 at 3:07 PM, Aureliano Buendia wrote:

> Also over thinking is appreciated in this problem, as my production data
> is actually near 100 x 1000,000,000 and data duplication could get messy
> with this.
>
> Sorry about the initial misinformation, I was thinking about my
> development/test data.
>
>
> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia 
> wrote:
>
>>
>>
>>
>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek wrote:
>>
>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>> which would be a fraction of standard cluster.  But since overthinking is a
>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for
>>> each window.  Now you only need to replicate data across the boundaries of
>>> each partition of windows, rather than each window.
>>>
>>
>> How can this be written in spark scala?
>>
>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen wrote:
>>>
>>>> Are we over-thinking the problem here? Since the per-window compute
>>>> task is hugely expensive, stateless from window to window, and the original
>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in
>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>> reading that data loading and distribution are going to be a tiny fraction
>>>> of the overall compute time.
>>>>
>>>> If that's the case, it would make sense simply to start with a 1GB
>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows
>>>> of 5,000 doubles each, map them out to the workers and have them interpret
>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
>>>> have a good fraction of several days to figure it out :)
>>>>
>>>> This would be a great load test for Spark's resiliency over
>>>> long-running computations.
>>>>
>>>> --
>>>> Christopher T. Nguyen
>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>> linkedin.com/in/ctnguyen
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>> free...@adatao.com> wrote:
>>>>
>>>>> Hmm, I misread that you need a sliding window.
>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>> improve NLineInputFormat so that partitions will have a small overlapping
>>>>> portion in this case the overlapping portion is 50 columns
>>>>> So let say the matrix is divided into overlapping partitions like this
>>>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
>>>>> assign each partition to a mapper to do mapPartition on it.
>>>>>
>>>>>
>>>>> 
>>>>> Michael (Bach) Bui, PhD,
>>>>> Senior Staff Architect, ADATAO Inc.
>>>>> www.adatao.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui 
>>>>> wrote:
>>>>>
>>>>> Here, Tom assumed that you have your big matrix already being loaded
>>>>> in one machine. Now if you want to distribute it to slave nodes you will
>>>>> need to broadcast it. I would expect this broadcasting will be done once 
>>>>> at
>>>>> the beginning of your algorithm and the computation time will dominate the
>>>>> overal

Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Tom Vacek
Totally agree.  Even with a 50x data replication, that's only 40 GB, which
would be a fraction of standard cluster.  But since overthinking is a lot
of fun, how about this: do a mapPartitions with a threaded subtask for each
window.  Now you only need to replicate data across the boundaries of each
partition of windows, rather than each window.


On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen  wrote:

> Are we over-thinking the problem here? Since the per-window compute task
> is hugely expensive, stateless from window to window, and the original big
> matrix is just 1GB, the primary gain in using a parallel engine is in
> distributing and scheduling these (long-running, isolated) tasks. I'm
> reading that data loading and distribution are going to be a tiny fraction
> of the overall compute time.
>
> If that's the case, it would make sense simply to start with a 1GB
> Array[Double] on the driver, from that create an RDD comprising 20,000 rows
> of 5,000 doubles each, map them out to the workers and have them interpret
> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
> have a good fraction of several days to figure it out :)
>
> This would be a great load test for Spark's resiliency over long-running
> computations.
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui 
> wrote:
>
>> Hmm, I misread that you need a sliding window.
>> I am thinking out loud here: one way of dealing with this is to improve
>> NLineInputFormat so that partitions will have a small overlapping portion
>> in this case the overlapping portion is 50 columns
>> So let say the matrix is divided into overlapping partitions like this
>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
>> assign each partition to a mapper to do mapPartition on it.
>>
>>
>> 
>> Michael (Bach) Bui, PhD,
>> Senior Staff Architect, ADATAO Inc.
>> www.adatao.com
>>
>>
>>
>>
>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui 
>> wrote:
>>
>> Here, Tom assumed that you have your big matrix already being loaded in
>> one machine. Now if you want to distribute it to slave nodes you will need
>> to broadcast it. I would expect this broadcasting will be done once at the
>> beginning of your algorithm and the computation time will dominate the
>> overall execution time.
>>
>> On the other hand, a better way to deal with huge matrix is to store the
>> data in hdfs and load data into each slaves partition-by-partition. This is
>> fundamental data processing pattern in Spark/Hadoop world.
>> If you opt to do this, you will have to use suitable InputFormat to make
>> sure each partition has the right amount of row that you want.
>> For example if you are lucky each HDFS partition have exact n*50 rows,
>> then you can use rdd.mapPartition(func). Where func will take care of
>> splitting n*50-row partition into n sub matrix
>>
>> However, HDFS TextInput or SequnceInputFormat format will not guarantee
>> each partition has certain number of rows. What you want is
>> NLineInputFormat, which I think currently has not been pulled into Spark
>> yet.
>> If everyone think this is needed, I can implement it quickly, it should
>> be pretty easy.
>>
>>
>> 
>> Michael (Bach) Bui, PhD,
>> Senior Staff Architect, ADATAO Inc.
>> www.adatao.com
>>
>>
>>
>>
>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia 
>> wrote:
>>
>>
>>
>>
>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek wrote:
>>
>>> Oh, I see.  I was thinking that there was a computational dependency on
>>> one window to the next.  If the computations are independent, then I think
>>> Spark can help you out quite a bit.
>>>
>>> I think you would want an RDD where each element is a window of your
>>> dense matrix.  I'm not aware of a way to distribute the windows of the big
>>> matrix in a way that doesn't involve broadcasting the whole thing.  You
>>> might have to tweak some config options, but I think it would work
>>> straightaway.  I would initialize the data structure like this:
>>> val matB = sc.broadcast(myBigDenseMatrix)
>>> val distributedChunks = sc.parallelize(0 until
>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value,
>>> windowID) ) )
>>>
&

Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Tom Vacek
Oh, I see.  I was thinking that there was a computational dependency on one
window to the next.  If the computations are independent, then I think
Spark can help you out quite a bit.

I think you would want an RDD where each element is a window of your dense
matrix.  I'm not aware of a way to distribute the windows of the big matrix
in a way that doesn't involve broadcasting the whole thing.  You might have
to tweak some config options, but I think it would work straightaway.  I
would initialize the data structure like this:
val matB = sc.broadcast(myBigDenseMatrix)
val distributedChunks = sc.parallelize(0 until numWindows).mapPartitions(it
=> it.map(windowID => getWindow(matB.value, windowID) ) )

Then just apply your matrix ops as map on

You maybe have your own tool for dense matrix ops, but I would suggest
Scala Breeze.  You'll have to use an old version of Breeze (current builds
are for 2.10).  Spark with Scala-2.10 is a little way off.


On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia wrote:

>
>
>
> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek  wrote:
>
>> If you use an RDD[Array[Double]] with a row decomposition of the matrix,
>> you can index windows of the rows all you want, but you're limited to 100
>> concurrent tasks.  You could use a column decomposition and access subsets
>> of the columns with a PartitionPruningRDD.  I have to say, though, if
>> you're doing dense matrix operations, they will be 100s of times faster on
>> a shared mem platform.  This particular matrix, at 800 MB could be a Breeze
>> on a single node.
>>
>
> The computation for every submatrix is very expensive, it takes days on a
> single node. I was hoping this can be reduced to hours or minutes with
> spark.
>
> Are you saying that spark is not suitable for this type of job?
>


Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Tom Vacek
If you use an RDD[Array[Double]] with a row decomposition of the matrix,
you can index windows of the rows all you want, but you're limited to 100
concurrent tasks.  You could use a column decomposition and access subsets
of the columns with a PartitionPruningRDD.  I have to say, though, if
you're doing dense matrix operations, they will be 100s of times faster on
a shared mem platform.  This particular matrix, at 800 MB could be a Breeze
on a single node.


On Fri, Dec 20, 2013 at 9:40 AM, Aureliano Buendia wrote:

> Hi,
>
> I have a 100 x 1000,000 matrix of double value, and I want to perform
> distributed computing on
> a 'window' of 100 x 50, where the window starts at each column. That is,
> each task must have access to columns j to j+50.
>
> Spark examples only come with accessing a single row per task. Is it
> possible to have access to a small part of the matrix?
>


reduce times

2013-12-19 Thread Tom Vacek
Hi all,

I implemented the TRON algorithm for logistic regression from Liblinear in
Spark.  The method is a Newton method, so it converges in a relatively
small number of iterations which are only 2x the cost of a gradient
computation.  The distributed portion of the algorithm is a Hessian-vector
product, which can be computed as X^T (D (Xv) ), where X is the data matrix
and D is diagonal.  I've written this in the following way:

//sigs contain the D scalars.  A variable "data" is in scope which is an
//RDD of a container class for a datapoint.
def getLossHessianVec(sigs:RDD[Double], v: Array[Double]):
Array[Double] = {
  val vB = sc.broadcast(v)
  val accums = data.zipPartitions(sigs)( (dataIt, sigIt) => {
val accum = Array.fill(vB.value.length)(0.0)
dataIt.zip(sigIt).foreach{ case (numData, sig) => {
  //do a dot product x_i dot w, scale by sig
  //then accumulate each data example into accum,
  //scaling by the previous line
  })
}}
Array(accum).iterator
  }).cache
  accums.count
  accums.reduce((x,y) => {(0 until x.length).foreach(ii =>
x(ii)+=y(ii)); x})
}

Basically, the computation up to the count (2nd-to-last line) runs like I
would expect---a few 100 ms for a moderate dataset.  However, the last line
generally takes 4-5 times longer, even with a small number of workers 2 or
4 workers, and with 200 workers, the typical wall clock time was around 10
seconds for the reduce.  The accums are moderate-sized Arrays, like
3,000,000 Doubles.  This isn't an issue of scaling as much as it is one of
wall clock time, since a worker wouldn't be able to hold enough data to
amortize the cost of the reduces.

I originally had the code without the count statement---adding it made it
run faster.  The reduction is ugly, but it it was as fast as I could find.

Any suggestions?

Thanks,

Tom


Re: code review - splitting columns

2013-11-18 Thread Tom Vacek
This is in response to your question about something in the API that
already does this.  You might want to keep your eye on MLI (
http://www.mlbase.org), which is columnar table written for machine
learning but applicable to a lot of problems.  It's not perfect right now.


On Fri, Nov 15, 2013 at 7:56 PM, Aaron Davidson  wrote:

> Regarding only your last point, you could always split backwards to avoid
> having to worry about updated indices (i.e., split the highest index column
> first). But if you're additionally worried about efficiency, a combined
> approach could make more sense to avoid making two full passes on the data.
>
> Otherwise, I don't see anything particularly amiss here, but I'm no expert.
>
>
> On Wed, Nov 13, 2013 at 3:00 PM, Philip Ogren wrote:
>
>> Hi Spark community,
>>
>> I learned a lot the last time I posted some elementary Spark code here.
>>  So, I thought I would do it again.  Someone politely tell me offline if
>> this is noise or unfair use of the list!  I acknowledge that this borders
>> on asking Scala 101 questions
>>
>> I have an RDD[List[String]] corresponding to columns of data and I want
>> to split one of the columns using some arbitrary function and return an RDD
>> updated with the new columns.  Here is the code I came up with.
>>
>> def splitColumn(columnsRDD: RDD[List[String]], columnIndex: Int,
>> numSplits: Int, splitFx: String => List[String]): RDD[List[String]] = {
>>
>> def insertColumns(columns: List[String]) : List[String] = {
>>   val split = columns.splitAt(columnIndex)
>>   val left = split._1
>>   val splitColumn = split._2.head
>>   val splitColumns = splitFx(splitColumn).padTo(numSplits,
>> "").take(numSplits)
>>   val right = split._2.tail
>>   left ++ splitColumns ++ right
>> }
>>
>> columnsRDD.map(columns => insertColumns(columns))
>>   }
>>
>> Here is a simple test that demonstrates the behavior:
>>
>>   val spark = new SparkContext("local", "test spark")
>>   val testStrings = List(List("1.2", "a b"), List("3.4", "c d e"),
>> List("5.6", "f"))
>>   var testRDD: RDD[List[String]] = spark.parallelize(testStrings)
>>   testRDD = splitColumn(testRDD, 0, 2, _.split("\\.").toList)
>>   testRDD = splitColumn(testRDD, 2, 2, _.split(" ").toList) //Line 5
>>   val actualStrings = testRDD.collect.toList
>>   assertEquals(4, actualStrings(0).length)
>>   assertEquals("1, 2, a, b", actualStrings(0).mkString(", "))
>>   assertEquals(4, actualStrings(1).length)
>>   assertEquals("3, 4, c, d", actualStrings(1).mkString(", "))
>>   assertEquals(4, actualStrings(2).length)
>>   assertEquals("5, 6, f, ", actualStrings(2).mkString(", "))
>>
>>
>> My first concern about this code is that I'm missing out on something
>> that does exactly this in the API.  This seems like such a common use case
>> that I would not be surprised if there's a readily available way to do this.
>>
>> I'm a little uncertain about the typing of splitColumn - i.e. the first
>> parameter and the return value.  It seems like a general solution wouldn't
>> require every column to be a String value.  I'm also annoyed that line 5 in
>> the test code requires that I use an updated index to split what was
>> originally the second column.  This suggests that perhaps I should split
>> all the columns that need splitting in one function call - but it seems
>> like doing that would require an unwieldy function signature.
>>
>> Any advice or insight is appreciated!
>>
>> Thanks,
>> Philip
>>
>
>


Re: code review - counting populated columns

2013-11-08 Thread Tom Vacek
Patrick, you got me thinking, but I'm sticking to my opinion that
reduceByKey should be avoided if possible.  I tried some timings:

def time[T](code : => T) =  {
val t0 = System.nanoTime : Double
val res = code
val t1 = System.nanoTime : Double
println("Elapsed time " + (t1 - t0) / 100.0 + " msecs")
res
}
val sparsity=.001
val rows = sc.parallelize(1 to 1000).mapPartitionsWithIndex( (id, it)
=> {val rng = new scala.util.Random(id+42); it.map(row => (0 until
1).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )}
).map(_.toArray).cache
val rowsFlat = rows.flatMap(rr => rr).cache

rows.count
rowsFlat.count

val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
//Elapsed time 725.394917 msecs

val cSums2 = time( rows.mapPartitions(it =>
Array(it.foldLeft(Array.fill(1)(0))((acc,nn) =>
{nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =>
r1.zip(r2).map(tt => tt._1 + tt._2)))
//Elapsed time 206.962364 msecs

These are the best times over a small number of runs, but average case
showed the same behavior.
The merge reduction I had suggested was not even close, which doesn't
surprise me much on second thought.

At sparsity=.01, the times are 2447 v. 394.

Lesson 1: You would care about this in an iterative algorithm, but not in a
one-off application.
Lesson 2: Shuffle is slow in comparison, even for a small number of
elements.
Lesson 3: Spark would be even cooler with highly optimized reduce and
broadcast.



On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren wrote:

> Thank you for the pointers.  I'm not sure I was able to fully understand
> either of your suggestions but here is what I came up with.  I started with
> Tom's code but I think I ended up borrowing from Patrick's suggestion too.
>  Any thoughts about my updated solution are more than welcome!  I added
> local variable types for clarify.
>
>   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
> //split by tab and zip with index to give column value, column index
> pairs
> val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
> line.split("\t").zipWithIndex)
> //filter out all the zero length values
> val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
> valueIndex._1.length>0)
> //map each column index to one and do the usual reduction
> dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
>   }
>
> Of course, this can be condensed to a single line but it doesn't seem as
> easy to read as the more verbose code above.  Write-once code like the
> following is why I never liked Perl
>
>   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
> tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
>   }
>
> Thanks,
> Philip
>
>
>
> On 11/8/2013 2:41 PM, Patrick Wendell wrote:
>
>> Hey Tom,
>>
>> reduceByKey will reduce locally on all the nodes, so there won't be
>> any data movement except to combine totals at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek  wrote:
>>
>>> Your example requires each row to be exactly the same length, since
>>> zipped
>>> will truncate to the shorter of its two arguments.
>>>
>>> The second solution is elegant, but reduceByKey involves flying a bunch
>>> of
>>> data around to sort the keys.  I suspect it would be a lot slower.  But
>>> you
>>> could save yourself from adding up a bunch of zeros:
>>>
>>>   val sparseRows = spark.textFile("myfile.tsv").map(line =>
>>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>>> sparseRows.reduce(mergeAdd(_,_))
>>>
>>> You'll have to write a mergeAdd function.  This might not be any faster,
>>> but
>>> it does allow variable length rows.
>>>
>>>
>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell 
>>> wrote:
>>>
>>>> It would be a bit more straightforward to write it like this:
>>>>
>>>> val columns = [same as before]
>>>>
>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>>> column).reduceByKey(_+ _)
>>>>
>>>> Basically look at each row and emit several records using flatMap.
>>>> Each record has an ID for the column (maybe its index) and a flag for
>>>> whether it's present.
>>>>
>>>> Then you reduce by key to get the per-column count. Then you can
>>>> collect at the end

Re: code review - counting populated columns

2013-11-08 Thread Tom Vacek
Messed up.  Should be
 val sparseRows = spark.textFile("myfile.tsv").map(line =>
line.split("\t").zipWithIndex.flatMap( tt => if(tt._1.length>0) (tt._2, 1) )
Then reduce with a mergeAdd.


On Fri, Nov 8, 2013 at 3:35 PM, Tom Vacek  wrote:

> Your example requires each row to be exactly the same length, since zipped
> will truncate to the shorter of its two arguments.
>
> The second solution is elegant, but reduceByKey involves flying a bunch of
> data around to sort the keys.  I suspect it would be a lot slower.  But you
> could save yourself from adding up a bunch of zeros:
>
>  val sparseRows = spark.textFile("myfile.tsv").map(line =>
> line.split("\t").zipWithIndex.filter(_._1.length>0))
> sparseRows.reduce(mergeAdd(_,_))
>
> You'll have to write a mergeAdd function.  This might not be any faster,
> but it does allow variable length rows.
>
>
> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell wrote:
>
>> It would be a bit more straightforward to write it like this:
>>
>> val columns = [same as before]
>>
>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>> column).reduceByKey(_+ _)
>>
>> Basically look at each row and emit several records using flatMap.
>> Each record has an ID for the column (maybe its index) and a flag for
>> whether it's present.
>>
>> Then you reduce by key to get the per-column count. Then you can
>> collect at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren 
>> wrote:
>> > Hi Spark coders,
>> >
>> > I wrote my first little Spark job that takes columnar data and counts
>> up how
>> > many times each column is populated in an RDD.  Here is the code I came
>> up
>> > with:
>> >
>> > //RDD of List[String] corresponding to tab delimited values
>> > val columns = spark.textFile("myfile.tsv").map(line =>
>> > line.split("\t").toList)
>> > //RDD of List[Int] corresponding to populated columns (1 for
>> populated
>> > and 0 for not populated)
>> > val populatedColumns = columns.map(row => row.map(column =>
>> > if(column.length > 0) 1 else 0))
>> > //List[Int] contains sums of the 1's in each column
>> > val counts = populatedColumns.reduce((row1,row2)
>> > =>(row1,row2).zipped.map(_+_))
>> >
>> > Any thoughts about the fitness of this code snippet?  I'm a little
>> annoyed
>> > by creating an RDD full of 1's and 0's in the second line.  The if
>> statement
>> > feels awkward too.  I was happy to find the zipped method for the reduce
>> > step.  Any feedback you might have on how to improve this code is
>> > appreciated.  I'm a newbie to both Scala and Spark.
>> >
>> > Thanks,
>> > Philip
>> >
>>
>
>


Re: code review - counting populated columns

2013-11-08 Thread Tom Vacek
Your example requires each row to be exactly the same length, since zipped
will truncate to the shorter of its two arguments.

The second solution is elegant, but reduceByKey involves flying a bunch of
data around to sort the keys.  I suspect it would be a lot slower.  But you
could save yourself from adding up a bunch of zeros:

 val sparseRows = spark.textFile("myfile.tsv").map(line =>
line.split("\t").zipWithIndex.filter(_._1.length>0))
sparseRows.reduce(mergeAdd(_,_))

You'll have to write a mergeAdd function.  This might not be any faster,
but it does allow variable length rows.


On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell  wrote:

> It would be a bit more straightforward to write it like this:
>
> val columns = [same as before]
>
> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
> column).reduceByKey(_+ _)
>
> Basically look at each row and emit several records using flatMap.
> Each record has an ID for the column (maybe its index) and a flag for
> whether it's present.
>
> Then you reduce by key to get the per-column count. Then you can
> collect at the end.
>
> - Patrick
>
> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren 
> wrote:
> > Hi Spark coders,
> >
> > I wrote my first little Spark job that takes columnar data and counts up
> how
> > many times each column is populated in an RDD.  Here is the code I came
> up
> > with:
> >
> > //RDD of List[String] corresponding to tab delimited values
> > val columns = spark.textFile("myfile.tsv").map(line =>
> > line.split("\t").toList)
> > //RDD of List[Int] corresponding to populated columns (1 for
> populated
> > and 0 for not populated)
> > val populatedColumns = columns.map(row => row.map(column =>
> > if(column.length > 0) 1 else 0))
> > //List[Int] contains sums of the 1's in each column
> > val counts = populatedColumns.reduce((row1,row2)
> > =>(row1,row2).zipped.map(_+_))
> >
> > Any thoughts about the fitness of this code snippet?  I'm a little
> annoyed
> > by creating an RDD full of 1's and 0's in the second line.  The if
> statement
> > feels awkward too.  I was happy to find the zipped method for the reduce
> > step.  Any feedback you might have on how to improve this code is
> > appreciated.  I'm a newbie to both Scala and Spark.
> >
> > Thanks,
> > Philip
> >
>


Re: Stage failures

2013-10-28 Thread Tom Vacek
Yes, I looked at the log, and the serialized tasks were about 2k bytes as
well.  Is there anything I can do to move this along?


On Thu, Oct 24, 2013 at 2:05 PM, Josh Rosen  wrote:

> Maybe this is a bug in the ClosureCleaner.  If you look at the
>
> 13/10/23 14:16:39 INFO cluster.ClusterTaskSetManager: Serialized task
>> 0.0:263 as 39625334 bytes in 55 ms
>
>
> line in your log, this corresponds to the driver serializing a ~38
> megabyte task.  This suggests that the broadcasted data is accidentally
> being included in the serialized task.
>
> When I tried this locally with val lb = sc.broadcast( (1 to
> 500).toSet)), I saw a serialized task size of 1814 bytes.
>
>
> On Thu, Oct 24, 2013 at 10:59 AM, Tom Vacek wrote:
>
>> I've figured out what the problem is, but I don't understand why.  I'm
>> hoping somebody can explain this:
>>
>> (in the spark shell)
>> val lb = sc.broadcast( (1 to 1000).toSet)
>> val breakMe = sc.parallelize(1 to 250).mapPartitions( it => {val
>> serializedSet = lb.value.toString; Array(0).iterator}).count  //works great
>>
>> val ll = (1 to 1000).toSet
>> val lb = sc.broadcast(ll)
>> val breakMe = sc.parallelize(1 to 250).mapPartitions( it => {val
>> serializedSet = lb.value.toString; Array(0).iterator}).count  //Crashes
>> ignominiously
>>
>>
>>
>


Re: Stage failures

2013-10-24 Thread Tom Vacek
I've figured out what the problem is, but I don't understand why.  I'm
hoping somebody can explain this:

(in the spark shell)
val lb = sc.broadcast( (1 to 1000).toSet)
val breakMe = sc.parallelize(1 to 250).mapPartitions( it => {val
serializedSet = lb.value.toString; Array(0).iterator}).count  //works great

val ll = (1 to 1000).toSet
val lb = sc.broadcast(ll)
val breakMe = sc.parallelize(1 to 250).mapPartitions( it => {val
serializedSet = lb.value.toString; Array(0).iterator}).count  //Crashes
ignominiously


Re: Stage failures

2013-10-23 Thread Tom Vacek
Follow up: I have some more details about this problem.  The problem can be
replicated with a toy dataset:
val lookup = (1 to 1000).toSet
val lB = sc.broadcast(lookup)
val data = sc.parallelize(1 to 1000).map(i => (1 to 500).toArray)
val dataSel = data.map(vv => vv.filter(lB.value.contains(_)))
dataSel.count

This is designed to emulate feature selection on a 10,000,000-example
dataset.  If the lookup is small, this will run successfully.  When the
lookup becomes sufficiently large, the problem arises.  Sufficiently large
isn't all that big (< 1G).  I've included a snippet from the log messages.
 We've tried increasing spark.akka.askTimeout=20, but this only seems to
increase the delay in the logging stream between the INFO message for the
executor failure and the stack trace.  The problem always appears near a GC
on the driver.  As I said before, there is nothing out of the ordinary in
stdout/stderr on the workers, and the node logs show task restarts, but
nothing of use.  I would appreciate any suggestions for debugging this
further.  Thank you in advance.  Here is the log output:



13/10/23 14:16:39 INFO cluster.ClusterTaskSetManager: Serialized task
0.0:263 as 39625334 bytes in 55 ms
13/10/23 14:16:39 INFO client.Client$ClientActor: Executor updated:
app-20131023141253-0011/9 is now FAILED (Command exited with code 1)
13/10/23 14:16:39 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20131023141253-0011/9 removed: Command exited with code 1
[GC
Desired survivor size 5596774400 bytes, new threshold 4 (max 15)
 [PSYoungGen: 19318548K->4461016K(20650496K)]
25045771K->11103784K(70982144K), 0.6651370 secs] [Times: user=8.74
sys=3.09, real=0.66 secs]
[GC
Desired survivor size 5615648768 bytes, new threshold 3 (max 15)
 [PSYoungGen: 20640075K->3083947K(18713088K)]
27282843K->11130718K(69044736K), 0.9221720 secs] [Times: user=8.82
sys=7.61, real=0.92 secs]
[GC
Desired survivor size 5275910144 bytes, new threshold 2 (max 15)
 [PSYoungGen: 17246485K->2101931K(19681792K)]
25293256K->11155006K(70013440K), 0.3125910 secs] [Times: user=4.82
sys=0.73, real=0.31 secs]
[GC
Desired survivor size 5356126208 bytes, new threshold 1 (max 15)
 [PSYoungGen: 16270913K->41254K(19856896K)]
25323988K->11148757K(70188544K), 0.2097400 secs] [Times: user=2.45
sys=1.26, real=0.21 secs]
13/10/23 14:16:59 INFO client.Client$ClientActor: Connecting to master
spark://sanji-03:7077
13/10/23 14:16:59 ERROR client.Client$ClientActor: Error notifying
standalone scheduler's driver actor
org.apache.spark.SparkException: Error notifying standalone scheduler's
driver actor
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:192)
at
org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.executorRemoved(SparkDeploySchedulerBackend.scala:90)
at
org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:92)
at
org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:72)
at akka.actor.Actor$class.apply(Actor.scala:318)
at org.apache.spark.deploy.client.Client$ClientActor.apply(Client.scala:51)
at akka.actor.ActorCell.invoke(ActorCell.scala:626)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
at akka.dispatch.Mailbox.run(Mailbox.scala:179)
at
akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[2] milliseconds
at akka.dispatch.DefaultPromise.ready(Future.scala:870)
at akka.dispatch.DefaultPromise.result(Future.scala:874)
at akka.dispatch.Await$.result(Future.scala:74)
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:189)
... 13 more



On Tue, Oct 22, 2013 at 3:17 PM, Tom Vacek  wrote:

> I have a simple code snippet for the shell.  I'm running 0.8.0, and this
> happens with both the Spark master and Mesos.  Basically, I'm just reading
> a local file from the login node, broadcasting the contents as a set, and
> then filtering a list embedded in an RDD.  However, the stage fails every
> time I have run it.  I'm looking for advice about the problem.  I suspect
> there is a misconfiguration on the cluster, but I have no idea where to
> start.  Any suggestions are appreciated.  Code snippet and log messages
> follow.
>
> val wordLookup = scala.io.Source.fromFile("/data/share/rnaTokLookup",
> "latin1").getLines().toList
>
> val rnaToks = wordLookup.map(ss => {val chunks = ss.split("\\t");
> chunks(0) } ).toSet

Re: Visitor function to RDD elements

2013-10-22 Thread Tom Vacek
Unfortunately, I think you're going to either have to fly a lot of data
around or create a lot of garbage.


On Tue, Oct 22, 2013 at 3:36 PM, Patrick Wendell  wrote:

> Hey Matt,
>
> It seems like you are trying to perform an operation that just isn't
> parrallelizable. In that case, it's going to be tricky without collecting
> the entire dataset on one node.
>
> Spark does not expose an iterator like you are suggesting, that lets you
> traverse an RDD. You could build one yourself though by collecting one
> partition at a time at the driver, thought this would require some lower
> level understanding of Spark.
>
> - Patrick
>
>
>
> On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah  wrote:
>
>>  In this context, it would be able to create a visitor mapping for each
>> partition. However, I'm looking for the ability to use a single visitor
>> object that will walk over all partitions.
>>
>>  I suppose I could do this if I used coalesce() to combine everything to
>> one partition but that's too much memory in one partition. Am I
>> misinterpreting how to use it?
>>
>>   From: Mark Hamstra 
>> Reply-To: "user@spark.incubator.apache.org" <
>> user@spark.incubator.apache.org>
>> Date: Tuesday, October 22, 2013 12:51 PM
>> To: user 
>> Subject: Re: Visitor function to RDD elements
>>
>>   mapPartitions
>> mapPartitionsWithIndex
>>
>>  With care, you can use these and maintain the iteration order within
>> partitions.  Beware, though, that any reduce functions need to be
>> associative and commutative.
>>
>>
>> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah  wrote:
>>
>>>  Hi everyone,
>>>
>>>  I have a driver holding a reference to an RDD. The driver would like
>>> to "visit" each item in the RDD in order, say with a visitor object that
>>> invokes visit(item) to modify that visitor's internal state. The visiting
>>> is not commutative (e.g. Visiting item A then B makes a different internal
>>> state from visiting item B then item A). Items in the RDD also are not
>>> necessarily distinct.
>>>
>>>  I've looked into accumulators which don't work because they require
>>> the operation to be commutative. Collect() will not work because the RDD is
>>> too large; in general, bringing the whole RDD into one partition won't work
>>> since the RDD is too large.
>>>
>>>  Is it possible to iterate over the items in an RDD in order without
>>> bringing the entire dataset into a single JVM at a time, and/or obtain
>>> chunks of the RDD in order on the driver? We've tried using the internal
>>> iterator() method. In some cases, we get a stack trace (running locally
>>> with 3 threads). I've included the stack trace below.
>>>
>>>  Thanks,
>>>
>>>  -Matt Cheah
>>>
>>>  org.apache.spark.SparkException: Error communicating with
>>> MapOutputTracker
>>> at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>> at
>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>> at
>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>>> at
>>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>>> at
>>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>>> at
>>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>>> at
>>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>

Stage failures

2013-10-22 Thread Tom Vacek
I have a simple code snippet for the shell.  I'm running 0.8.0, and this
happens with both the Spark master and Mesos.  Basically, I'm just reading
a local file from the login node, broadcasting the contents as a set, and
then filtering a list embedded in an RDD.  However, the stage fails every
time I have run it.  I'm looking for advice about the problem.  I suspect
there is a misconfiguration on the cluster, but I have no idea where to
start.  Any suggestions are appreciated.  Code snippet and log messages
follow.

val wordLookup = scala.io.Source.fromFile("/data/share/rnaTokLookup",
"latin1").getLines().toList

val rnaToks = wordLookup.map(ss => {val chunks = ss.split("\\t"); chunks(0)
} ).toSet

val rnaToksB = sc.broadcast(rnaToks)

val text = sc.textFile("hdfs://sanji-03/user/tom/rnaHuge/docVecs")

val ngrams = text.map(tt => {val blobs = tt.split("\\t"); (blobs(0),
blobs(1).split(" "))})
//ngrams: [(String, Array[String])]
val ngramsLight = textBlobs.map(tt => (tt._1,
tt._2.filter(rnaToksB.value.contains(_

ngramsLight.map(tt => tt._1 + "\t" + tt._2.mkString("
")).saveAsTextFile("hdfs://sanji-03/user/tom/rnaHuge/docVecsLight")

It runs just fine until it hits:
13/10/22 13:55:55 INFO client.Client$ClientActor: Executor updated:
app-20131022135234-0033/5 is now FAILED (Command exited with code 1)
13/10/22 13:55:55 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20131022135234-0033/5 removed: Command exited with code 1
13/10/22 13:56:05 INFO client.Client$ClientActor: Connecting to master
spark://sanji-03:7077
13/10/22 13:56:05 ERROR client.Client$ClientActor: Error notifying
standalone scheduler's driver actor
org.apache.spark.SparkException: Error notifying standalone scheduler's
driver actor
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:192)
at
org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.executorRemoved(SparkDeploySchedulerBackend.scala:90)
at
org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:92)
at
org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:72)
at akka.actor.Actor$class.apply(Actor.scala:318)
at org.apache.spark.deploy.client.Client$ClientActor.apply(Client.scala:51)
at akka.actor.ActorCell.invoke(ActorCell.scala:626)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
at akka.dispatch.Mailbox.run(Mailbox.scala:179)
at
akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1] milliseconds
at akka.dispatch.DefaultPromise.ready(Future.scala:870)
at akka.dispatch.DefaultPromise.result(Future.scala:874)
at akka.dispatch.Await$.result(Future.scala:74)
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:189)
... 13 more

After this, the node that failed is brought back and assigned a new task,
and a similar train of messages are sprayed over and over again, generally
involving different nodes.  The node log for this instance shows a number
of restarts, which have the following pattern:
13/10/22 13:32:33 INFO worker.Worker: Asked to launch executor
app-20131022133233-0024/5 for Spark shell
13/10/22 13:32:33 INFO worker.ExecutorRunner: Launch command: "java" "-cp"
"/data/scala/scala-0.8.0-incubating/lib/scala-library.jar:/data/spark/spark-0.8.0-incubating/conf:/data/spark/spark-0.8.0-incubating/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar"
"-Dspark.local.dir=/data/spark/tmp-0.8.0" "-Dspark.worker.timeout=12"
"-Dspark.local.dir=/data/spark/tmp-0.8.0" "-Dspark.worker.timeout=12"
"-Xms73728M" "-Xmx73728M"
"org.apache.spark.executor.StandaloneExecutorBackend" "akka://
sp...@sanji-03.int.westgroup.com:36446/user/StandaloneScheduler" "5" "
sanji-08.int.westgroup.com" "22"
13/10/22 13:37:38 INFO worker.Worker: Asked to kill executor
app-20131022133233-0024/5
13/10/22 13:37:38 INFO worker.ExecutorRunner: Killing process!
13/10/22 13:37:38 INFO worker.ExecutorRunner: Runner thread for executor
app-20131022133233-0024/5 interrupted
13/10/22 13:37:38 INFO worker.ExecutorRunner: Redirection to
/data/spark/spark-0.8.0-incubating/work/app-20131022133233-0024/5/stderr
closed: Stream closed
13/10/22 13:37:38 INFO worker.Worker: Executor app-20131022133233-0024/5
finished with state KILLED