Oh sorry, I'm rereading your email more carefully.  Its only because you
have some setup code that you want to amortize?

On Mon, Jan 5, 2015 at 10:40 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The types expected by applySchema are documented in the type reference
> section:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference
>
> I'd certainly accept a PR to improve the docs and add a link to this from
> the applySchema section :)
>
> Can you explain why you are using mapPartitions and UDFs don't work for
> you?  SQL doesn't really have a great support for partitions in general...
> We do support for Hive TGFs though and we could possibly add better scala
> syntax for this concept or something else.
>
> On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer <t...@preferred.jp> wrote:
>
>> Hi,
>>
>> I have a SchemaRDD where I want to add a column with a value that is
>> computed from the rest of the row. As the computation involves a
>> network operation and requires setup code, I can't use
>>   "SELECT *, myUDF(*) FROM rdd",
>> but I wanted to use a combination of:
>>
>>  - get schema of input SchemaRDD
>>  - issue a mapPartitions call (including the setup code), obtaining a
>>    new RDD[Row]
>>  - extend the schema manually
>>  - create a new RDD by combining the RDD[Row] with the extended
>>    schema.
>>
>> This works very well, but I run into trouble querying that resulting
>> SchemaRDD with SQL if:
>>
>>  - the result of my computation is a case class
>>  - and I want to use values in this case class in the SQL query.
>>
>> In particular, while
>>
>>   SELECT column FROM resultrdd
>>
>> works well,
>>
>>   SELECT column.key_name FROM resultrdd
>>
>> gives a
>>
>>   java.lang.ClassCastException: example.MyCaseClass cannot be cast to
>> org.apache.spark.sql.catalyst.expressions.Row
>>
>> Here is an example to illustrate that:
>>
>> -----------------------------------
>>
>> import org.apache.spark._import org.apache.spark.sql._import 
>> org.apache.spark.sql.catalyst.types._
>>
>> val sc = new SparkContext("local[3]", "Test")
>> val sqlc = new SQLContext(sc)import sqlc._
>> // this is the case class that my operation is returningcase class 
>> Result(string_values: Map[String, String],                  num_values: 
>> Map[String, Double])// dummy result dataval data = (Result(Map("team" -> 
>> "a"), Map("score" -> 0.8)) ::            Result(Map("team" -> "b"), 
>> Map("score" -> 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my 
>> computation by creating an RDD[Row] and creating// a schema 
>> programmaticallyval rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))val 
>> progSchema = StructType(StructField("hello", IntegerType, false) ::          
>>                   StructField("newcol", rdd.schema, true) :: Nil)val progRdd 
>> = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable("progrdd")// 
>> the following call will *fail* with a ClassCastExceptionsqlc.sql("SELECT 
>> newcol.string_values['team'] FROM progrdd").foreach(println)// however, the 
>> schema I specified is correct. see how embedding// my result in a proper 
>> case class works:case class ResultContainer(hello: Int, newcol: Result)val 
>> caseClassRdd = rdd.map(dr => ResultContainer(7, 
>> dr))caseClassRdd.registerTempTable("caseclassrdd")// the following call will 
>> *work*sqlc.sql("SELECT newcol.string_values['team'] FROM 
>> caseclassrdd").foreach(println)// even though the schema for both RDDs is 
>> the same:progRdd.schema == caseClassRdd.schema
>>
>>
>> -----------------------------------
>>
>> It turns out that I cannot use the case class directly, but I have to
>> convert it to a Row as well. That is, instead of
>>
>>   val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))
>>
>> I have to use
>>
>>   val rowRdd = rdd.map(dr => Row.fromSeq(7 ::
>> Row.fromSeq(dr.productIterator.toSeq) :: Nil))
>>
>> and then, I can use
>>
>>   SELECT newcol.string_values['team'] FROM progrdd
>>
>> So now I found that out and I'm happy that it works, but it was quite
>> hard to track it down, so I was wondering if this is the most
>> intuitive way to add a column to a SchemaRDD using mapPartitions (as
>> opposed to using a UDF, where the conversion "case class -> Row"
>> seems to happen automatically).
>>
>> Or, even if there is no more intuitive way, just wanted to have this
>> documented ;-)
>>
>> Thanks
>> Tobias
>>
>>
>

Reply via email to