Re: Adding a column to a SchemaRDD
Hi Nathan, #1 Spark SQL DSL can satisfy your requirement. You can refer the following code snippet: jdata.select(Star(Node), 'seven.getField(mod), 'eleven.getField(mod)) You need to import org.apache.spark.sql.catalyst.analysis.Star in advance. #2 After you make the transform above, you do not need to make SchemaRDD manually. Because that jdata.select() return a SchemaRDD and you can operate on it directly. For example, the following code snippet will return a new SchemaRDD with longer Row: val t1 = jdata.select(Star(Node), 'seven.getField(mod) + 'eleven.getField(mod) as 'mod_sum) You can use t1.printSchema() to print the schema of this SchemaRDD and check whether it satisfy your requirements. 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: (1) I understand about immutability, that's why I said I wanted a new SchemaRDD. (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and results in a new SchemaRDD with one new function. (3) The DSL stuff is a big clue, but I can't find adequate documentation for it What I'm looking for is something like: import org.apache.spark.sql._ val sqlc = new SQLContext(sc) import sqlc._ val data = sc.parallelize(0 to 99).map(n = ({\seven\: {\mod\: %d, \times\: %d}, + \eleven\: {\mod\: %d, \times\: %d}}).format(n % 7, n * 7, n % 11, n * 11)) val jdata = sqlc.jsonRDD(data) jdata.registerTempTable(jdata) val sqlVersion = sqlc.sql(SELECT *, (seven.mod + eleven.mod) AS modsum FROM jdata) This sqlVersion works fine, but if I try to do the same thing with a programatic function, I'm missing a bunch of pieces: - I assume I'd need to start with something like: jdata.select('*, 'seven.mod, 'eleven.mod) and then get and process the last two elements. The problems are: - I can't select '* - there seems no way to get the complete row - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation seems only one deep. - Assuming I could do that, I don't see a way to make the result into a SchemaRDD. I assume I would have to do something like: 1. take my row and value, and create a new, slightly longer row 2. take my old schema, and create a new schema with one more field at the end, named and typed appropriately 3. combine the two into a SchemaRDD I think I see how to do 3, but 1 and 2 elude me. Is there more complete documentation somewhere for the DSL portion? Anyone have a clue about any of the above? On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang yanboha...@gmail.com wrote: RDD is immutable so you can not modify it. If you want to modify some value or schema in RDD, using map to generate a new RDD. The following code for your reference: def add(a:Int,b:Int):Int = { a + b } val d1 = sc.parallelize(1 to 10).map { i = (i, i+1, i+2) } val d2 = d1.map { i = (i._1, i._2, add(i._1, i._2))} d2.foreach(println) Otherwise, if your self-defining function is straightforward and you can represent it by SQL, using Spark SQL or DSL is also a good choice. case class Person(id: Int, score: Int, value: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val d1 = sc.parallelize(1 to 10).map { i = Person(i,i+1,i+2)} val d2 = d1.select('id, 'score, 'id + 'score) d2.foreach(println) 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com : Hi, there. I'm trying to understand how to augment data in a SchemaRDD. I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. Is this possible? If so, how? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Adding a column to a SchemaRDD
Nathan, On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. nkronenf...@oculusinfo.com I guess you would have to do two things: - schemardd.map(row = { extend the row here }) which will give you a plain RDD[Row] without a schema - take the schema from the schemardd and extend it manually by the name and type of the newly added column, - create a new SchemaRDD from your mapped RDD and the manually extended schema. Does that make sense? Tobias
Re: Adding a column to a SchemaRDD
RDD is immutable so you can not modify it. If you want to modify some value or schema in RDD, using map to generate a new RDD. The following code for your reference: def add(a:Int,b:Int):Int = { a + b } val d1 = sc.parallelize(1 to 10).map { i = (i, i+1, i+2) } val d2 = d1.map { i = (i._1, i._2, add(i._1, i._2))} d2.foreach(println) Otherwise, if your self-defining function is straightforward and you can represent it by SQL, using Spark SQL or DSL is also a good choice. case class Person(id: Int, score: Int, value: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val d1 = sc.parallelize(1 to 10).map { i = Person(i,i+1,i+2)} val d2 = d1.select('id, 'score, 'id + 'score) d2.foreach(println) 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: Hi, there. I'm trying to understand how to augment data in a SchemaRDD. I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. Is this possible? If so, how? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Adding a column to a SchemaRDD
(1) I understand about immutability, that's why I said I wanted a new SchemaRDD. (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and results in a new SchemaRDD with one new function. (3) The DSL stuff is a big clue, but I can't find adequate documentation for it What I'm looking for is something like: import org.apache.spark.sql._ val sqlc = new SQLContext(sc) import sqlc._ val data = sc.parallelize(0 to 99).map(n = ({\seven\: {\mod\: %d, \times\: %d}, + \eleven\: {\mod\: %d, \times\: %d}}).format(n % 7, n * 7, n % 11, n * 11)) val jdata = sqlc.jsonRDD(data) jdata.registerTempTable(jdata) val sqlVersion = sqlc.sql(SELECT *, (seven.mod + eleven.mod) AS modsum FROM jdata) This sqlVersion works fine, but if I try to do the same thing with a programatic function, I'm missing a bunch of pieces: - I assume I'd need to start with something like: jdata.select('*, 'seven.mod, 'eleven.mod) and then get and process the last two elements. The problems are: - I can't select '* - there seems no way to get the complete row - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation seems only one deep. - Assuming I could do that, I don't see a way to make the result into a SchemaRDD. I assume I would have to do something like: 1. take my row and value, and create a new, slightly longer row 2. take my old schema, and create a new schema with one more field at the end, named and typed appropriately 3. combine the two into a SchemaRDD I think I see how to do 3, but 1 and 2 elude me. Is there more complete documentation somewhere for the DSL portion? Anyone have a clue about any of the above? On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang yanboha...@gmail.com wrote: RDD is immutable so you can not modify it. If you want to modify some value or schema in RDD, using map to generate a new RDD. The following code for your reference: def add(a:Int,b:Int):Int = { a + b } val d1 = sc.parallelize(1 to 10).map { i = (i, i+1, i+2) } val d2 = d1.map { i = (i._1, i._2, add(i._1, i._2))} d2.foreach(println) Otherwise, if your self-defining function is straightforward and you can represent it by SQL, using Spark SQL or DSL is also a good choice. case class Person(id: Int, score: Int, value: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val d1 = sc.parallelize(1 to 10).map { i = Person(i,i+1,i+2)} val d2 = d1.select('id, 'score, 'id + 'score) d2.foreach(println) 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: Hi, there. I'm trying to understand how to augment data in a SchemaRDD. I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. Is this possible? If so, how? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com