(1) I understand about immutability, that's why I said I wanted a new SchemaRDD. (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and results in a new SchemaRDD with one new function. (3) The DSL stuff is a big clue, but I can't find adequate documentation for it
What I'm looking for is something like: import org.apache.spark.sql._ val sqlc = new SQLContext(sc) import sqlc._ val data = sc.parallelize(0 to 99).map(n => ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+ "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n % 11, n * 11)) val jdata = sqlc.jsonRDD(data) jdata.registerTempTable("jdata") val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum FROM jdata") This sqlVersion works fine, but if I try to do the same thing with a programatic function, I'm missing a bunch of pieces: - I assume I'd need to start with something like: jdata.select('*, 'seven.mod, 'eleven.mod) and then get and process the last two elements. The problems are: - I can't select '* - there seems no way to get the complete row - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation seems only one deep. - Assuming I could do that, I don't see a way to make the result into a SchemaRDD. I assume I would have to do something like: 1. take my row and value, and create a new, slightly longer row 2. take my old schema, and create a new schema with one more field at the end, named and typed appropriately 3. combine the two into a SchemaRDD I think I see how to do 3, but 1 and 2 elude me. Is there more complete documentation somewhere for the DSL portion? Anyone have a clue about any of the above? On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <yanboha...@gmail.com> wrote: > RDD is immutable so you can not modify it. > If you want to modify some value or schema in RDD, using map to generate > a new RDD. > The following code for your reference: > > def add(a:Int,b:Int):Int = { > a + b > } > > val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) } > val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))} > d2.foreach(println) > > > Otherwise, if your self-defining function is straightforward and you can > represent it by SQL, using Spark SQL or DSL is also a good choice. > > case class Person(id: Int, score: Int, value: Int) > > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > > import sqlContext._ > > val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)} > val d2 = d1.select('id, 'score, 'id + 'score) > d2.foreach(println) > > > 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nkronenf...@oculusinfo.com>: > >> Hi, there. >> >> I'm trying to understand how to augment data in a SchemaRDD. >> >> I can see how to do it if can express the added values in SQL - just run >> "SELECT *,valueCalculation AS newColumnName FROM table" >> >> I've been searching all over for how to do this if my added value is a >> scala function, with no luck. >> >> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add >> a new column, D, calculated using Utility.process(b, c), and I want (of >> course) to pass in the value B and C from each row, ending up with a new >> SchemaRDD with columns A, B, C, and D. >> >> Is this possible? If so, how? >> >> Thanks, >> -Nathan >> >> -- >> Nathan Kronenfeld >> Senior Visualization Developer >> Oculus Info Inc >> 2 Berkeley Street, Suite 600, >> Toronto, Ontario M5A 4J5 >> Phone: +1-416-203-3003 x 238 >> Email: nkronenf...@oculusinfo.com >> > > -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com