(1) I understand about immutability, that's why I said I wanted a new
SchemaRDD.
(2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
results in a new SchemaRDD with one new function.
(3) The DSL stuff is a big clue, but I can't find adequate documentation
for it

What I'm looking for is something like:

import org.apache.spark.sql._


val sqlc = new SQLContext(sc)
import sqlc._


val data = sc.parallelize(0 to 99).map(n =>
    ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
      "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n %
11, n * 11))
val jdata = sqlc.jsonRDD(data)
jdata.registerTempTable("jdata")


val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
FROM jdata")


This sqlVersion works fine, but if I try to do the same thing with a
programatic function, I'm missing a bunch of pieces:

   - I assume I'd need to start with something like:
   jdata.select('*, 'seven.mod, 'eleven.mod)
   and then get and process the last two elements.  The problems are:
      - I can't select '* - there seems no way to get the complete row
      - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
      seems only one deep.
   - Assuming I could do that, I don't see a way to make the result into a
   SchemaRDD.  I assume I would have to do something like:
      1. take my row and value, and create a new, slightly longer row
      2. take my old schema, and create a new schema with one more field at
      the end, named and typed appropriately
      3. combine the two into a SchemaRDD
      I think I see how to do 3, but 1 and 2 elude me.

Is there more complete documentation somewhere for the DSL portion? Anyone
have a clue about any of the above?



On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <yanboha...@gmail.com> wrote:

> RDD is immutable so you can not modify it.
> If you want to modify some value or schema in RDD,  using map to generate
> a new RDD.
> The following code for your reference:
>
> def add(a:Int,b:Int):Int = {
>   a + b
> }
>
> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
> d2.foreach(println)
>
>
> Otherwise, if your self-defining function is straightforward and you can
> represent it by SQL, using Spark SQL or DSL is also a good choice.
>
> case class Person(id: Int, score: Int, value: Int)
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext._
>
> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
> val d2 = d1.select('id, 'score, 'id + 'score)
> d2.foreach(println)
>
>
> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nkronenf...@oculusinfo.com>:
>
>> Hi, there.
>>
>> I'm trying to understand how to augment data in a SchemaRDD.
>>
>> I can see how to do it if can express the added values in SQL - just run
>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>
>> I've been searching all over for how to do this if my added value is a
>> scala function, with no luck.
>>
>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>> a new column, D, calculated using Utility.process(b, c), and I want (of
>> course) to pass in the value B and C from each row, ending up with a new
>> SchemaRDD with columns A, B, C, and D.
>>
>> Is this possible? If so, how?
>>
>> Thanks,
>>                    -Nathan
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com

Reply via email to