Oh sorry, I'm rereading your email more carefully. Its only because you have some setup code that you want to amortize?
On Mon, Jan 5, 2015 at 10:40 PM, Michael Armbrust <mich...@databricks.com> wrote: > The types expected by applySchema are documented in the type reference > section: > http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference > > I'd certainly accept a PR to improve the docs and add a link to this from > the applySchema section :) > > Can you explain why you are using mapPartitions and UDFs don't work for > you? SQL doesn't really have a great support for partitions in general... > We do support for Hive TGFs though and we could possibly add better scala > syntax for this concept or something else. > > On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer <t...@preferred.jp> wrote: > >> Hi, >> >> I have a SchemaRDD where I want to add a column with a value that is >> computed from the rest of the row. As the computation involves a >> network operation and requires setup code, I can't use >> "SELECT *, myUDF(*) FROM rdd", >> but I wanted to use a combination of: >> >> - get schema of input SchemaRDD >> - issue a mapPartitions call (including the setup code), obtaining a >> new RDD[Row] >> - extend the schema manually >> - create a new RDD by combining the RDD[Row] with the extended >> schema. >> >> This works very well, but I run into trouble querying that resulting >> SchemaRDD with SQL if: >> >> - the result of my computation is a case class >> - and I want to use values in this case class in the SQL query. >> >> In particular, while >> >> SELECT column FROM resultrdd >> >> works well, >> >> SELECT column.key_name FROM resultrdd >> >> gives a >> >> java.lang.ClassCastException: example.MyCaseClass cannot be cast to >> org.apache.spark.sql.catalyst.expressions.Row >> >> Here is an example to illustrate that: >> >> ----------------------------------- >> >> import org.apache.spark._import org.apache.spark.sql._import >> org.apache.spark.sql.catalyst.types._ >> >> val sc = new SparkContext("local[3]", "Test") >> val sqlc = new SQLContext(sc)import sqlc._ >> // this is the case class that my operation is returningcase class >> Result(string_values: Map[String, String], num_values: >> Map[String, Double])// dummy result dataval data = (Result(Map("team" -> >> "a"), Map("score" -> 0.8)) :: Result(Map("team" -> "b"), >> Map("score" -> 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my >> computation by creating an RDD[Row] and creating// a schema >> programmaticallyval rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))val >> progSchema = StructType(StructField("hello", IntegerType, false) :: >> StructField("newcol", rdd.schema, true) :: Nil)val progRdd >> = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable("progrdd")// >> the following call will *fail* with a ClassCastExceptionsqlc.sql("SELECT >> newcol.string_values['team'] FROM progrdd").foreach(println)// however, the >> schema I specified is correct. see how embedding// my result in a proper >> case class works:case class ResultContainer(hello: Int, newcol: Result)val >> caseClassRdd = rdd.map(dr => ResultContainer(7, >> dr))caseClassRdd.registerTempTable("caseclassrdd")// the following call will >> *work*sqlc.sql("SELECT newcol.string_values['team'] FROM >> caseclassrdd").foreach(println)// even though the schema for both RDDs is >> the same:progRdd.schema == caseClassRdd.schema >> >> >> ----------------------------------- >> >> It turns out that I cannot use the case class directly, but I have to >> convert it to a Row as well. That is, instead of >> >> val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil)) >> >> I have to use >> >> val rowRdd = rdd.map(dr => Row.fromSeq(7 :: >> Row.fromSeq(dr.productIterator.toSeq) :: Nil)) >> >> and then, I can use >> >> SELECT newcol.string_values['team'] FROM progrdd >> >> So now I found that out and I'm happy that it works, but it was quite >> hard to track it down, so I was wondering if this is the most >> intuitive way to add a column to a SchemaRDD using mapPartitions (as >> opposed to using a UDF, where the conversion "case class -> Row" >> seems to happen automatically). >> >> Or, even if there is no more intuitive way, just wanted to have this >> documented ;-) >> >> Thanks >> Tobias >> >> >