Re: code review - splitting columns

2013-11-18 Thread Tom Vacek
This is in response to your question about something in the API that
already does this.  You might want to keep your eye on MLI (
http://www.mlbase.org), which is columnar table written for machine
learning but applicable to a lot of problems.  It's not perfect right now.


On Fri, Nov 15, 2013 at 7:56 PM, Aaron Davidson ilike...@gmail.com wrote:

 Regarding only your last point, you could always split backwards to avoid
 having to worry about updated indices (i.e., split the highest index column
 first). But if you're additionally worried about efficiency, a combined
 approach could make more sense to avoid making two full passes on the data.

 Otherwise, I don't see anything particularly amiss here, but I'm no expert.


 On Wed, Nov 13, 2013 at 3:00 PM, Philip Ogren philip.og...@oracle.comwrote:

 Hi Spark community,

 I learned a lot the last time I posted some elementary Spark code here.
  So, I thought I would do it again.  Someone politely tell me offline if
 this is noise or unfair use of the list!  I acknowledge that this borders
 on asking Scala 101 questions

 I have an RDD[List[String]] corresponding to columns of data and I want
 to split one of the columns using some arbitrary function and return an RDD
 updated with the new columns.  Here is the code I came up with.

 def splitColumn(columnsRDD: RDD[List[String]], columnIndex: Int,
 numSplits: Int, splitFx: String = List[String]): RDD[List[String]] = {

 def insertColumns(columns: List[String]) : List[String] = {
   val split = columns.splitAt(columnIndex)
   val left = split._1
   val splitColumn = split._2.head
   val splitColumns = splitFx(splitColumn).padTo(numSplits,
 ).take(numSplits)
   val right = split._2.tail
   left ++ splitColumns ++ right
 }

 columnsRDD.map(columns = insertColumns(columns))
   }

 Here is a simple test that demonstrates the behavior:

   val spark = new SparkContext(local, test spark)
   val testStrings = List(List(1.2, a b), List(3.4, c d e),
 List(5.6, f))
   var testRDD: RDD[List[String]] = spark.parallelize(testStrings)
   testRDD = splitColumn(testRDD, 0, 2, _.split(\\.).toList)
   testRDD = splitColumn(testRDD, 2, 2, _.split( ).toList) //Line 5
   val actualStrings = testRDD.collect.toList
   assertEquals(4, actualStrings(0).length)
   assertEquals(1, 2, a, b, actualStrings(0).mkString(, ))
   assertEquals(4, actualStrings(1).length)
   assertEquals(3, 4, c, d, actualStrings(1).mkString(, ))
   assertEquals(4, actualStrings(2).length)
   assertEquals(5, 6, f, , actualStrings(2).mkString(, ))


 My first concern about this code is that I'm missing out on something
 that does exactly this in the API.  This seems like such a common use case
 that I would not be surprised if there's a readily available way to do this.

 I'm a little uncertain about the typing of splitColumn - i.e. the first
 parameter and the return value.  It seems like a general solution wouldn't
 require every column to be a String value.  I'm also annoyed that line 5 in
 the test code requires that I use an updated index to split what was
 originally the second column.  This suggests that perhaps I should split
 all the columns that need splitting in one function call - but it seems
 like doing that would require an unwieldy function signature.

 Any advice or insight is appreciated!

 Thanks,
 Philip





code review - splitting columns

2013-11-13 Thread Philip Ogren

Hi Spark community,

I learned a lot the last time I posted some elementary Spark code here.  
So, I thought I would do it again.  Someone politely tell me offline if 
this is noise or unfair use of the list!  I acknowledge that this 
borders on asking Scala 101 questions


I have an RDD[List[String]] corresponding to columns of data and I want 
to split one of the columns using some arbitrary function and return an 
RDD updated with the new columns.  Here is the code I came up with.


def splitColumn(columnsRDD: RDD[List[String]], columnIndex: Int, 
numSplits: Int, splitFx: String = List[String]): RDD[List[String]] = {


def insertColumns(columns: List[String]) : List[String] = {
  val split = columns.splitAt(columnIndex)
  val left = split._1
  val splitColumn = split._2.head
  val splitColumns = splitFx(splitColumn).padTo(numSplits, 
).take(numSplits)

  val right = split._2.tail
  left ++ splitColumns ++ right
}

columnsRDD.map(columns = insertColumns(columns))
  }

Here is a simple test that demonstrates the behavior:

  val spark = new SparkContext(local, test spark)
  val testStrings = List(List(1.2, a b), List(3.4, c d e), 
List(5.6, f))

  var testRDD: RDD[List[String]] = spark.parallelize(testStrings)
  testRDD = splitColumn(testRDD, 0, 2, _.split(\\.).toList)
  testRDD = splitColumn(testRDD, 2, 2, _.split( ).toList) //Line 5
  val actualStrings = testRDD.collect.toList
  assertEquals(4, actualStrings(0).length)
  assertEquals(1, 2, a, b, actualStrings(0).mkString(, ))
  assertEquals(4, actualStrings(1).length)
  assertEquals(3, 4, c, d, actualStrings(1).mkString(, ))
  assertEquals(4, actualStrings(2).length)
  assertEquals(5, 6, f, , actualStrings(2).mkString(, ))


My first concern about this code is that I'm missing out on something 
that does exactly this in the API.  This seems like such a common use 
case that I would not be surprised if there's a readily available way to 
do this.


I'm a little uncertain about the typing of splitColumn - i.e. the first 
parameter and the return value.  It seems like a general solution 
wouldn't require every column to be a String value.  I'm also annoyed 
that line 5 in the test code requires that I use an updated index to 
split what was originally the second column.  This suggests that perhaps 
I should split all the columns that need splitting in one function call 
- but it seems like doing that would require an unwieldy function signature.


Any advice or insight is appreciated!

Thanks,
Philip