Hi, I have a SchemaRDD where I want to add a column with a value that is computed from the rest of the row. As the computation involves a network operation and requires setup code, I can't use "SELECT *, myUDF(*) FROM rdd", but I wanted to use a combination of:
- get schema of input SchemaRDD - issue a mapPartitions call (including the setup code), obtaining a new RDD[Row] - extend the schema manually - create a new RDD by combining the RDD[Row] with the extended schema. This works very well, but I run into trouble querying that resulting SchemaRDD with SQL if: - the result of my computation is a case class - and I want to use values in this case class in the SQL query. In particular, while SELECT column FROM resultrdd works well, SELECT column.key_name FROM resultrdd gives a java.lang.ClassCastException: example.MyCaseClass cannot be cast to org.apache.spark.sql.catalyst.expressions.Row Here is an example to illustrate that: ----------------------------------- import org.apache.spark._import org.apache.spark.sql._import org.apache.spark.sql.catalyst.types._ val sc = new SparkContext("local[3]", "Test") val sqlc = new SQLContext(sc)import sqlc._ // this is the case class that my operation is returningcase class Result(string_values: Map[String, String], num_values: Map[String, Double])// dummy result dataval data = (Result(Map("team" -> "a"), Map("score" -> 0.8)) :: Result(Map("team" -> "b"), Map("score" -> 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my computation by creating an RDD[Row] and creating// a schema programmaticallyval rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))val progSchema = StructType(StructField("hello", IntegerType, false) :: StructField("newcol", rdd.schema, true) :: Nil)val progRdd = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable("progrdd")// the following call will *fail* with a ClassCastExceptionsqlc.sql("SELECT newcol.string_values['team'] FROM progrdd").foreach(println)// however, the schema I specified is correct. see how embedding// my result in a proper case class works:case class ResultContainer(hello: Int, newcol: Result)val caseClassRdd = rdd.map(dr => ResultContainer(7, dr))caseClassRdd.registerTempTable("caseclassrdd")// the following call will *work*sqlc.sql("SELECT newcol.string_values['team'] FROM caseclassrdd").foreach(println)// even though the schema for both RDDs is the same:progRdd.schema == caseClassRdd.schema ----------------------------------- It turns out that I cannot use the case class directly, but I have to convert it to a Row as well. That is, instead of val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil)) I have to use val rowRdd = rdd.map(dr => Row.fromSeq(7 :: Row.fromSeq(dr.productIterator.toSeq) :: Nil)) and then, I can use SELECT newcol.string_values['team'] FROM progrdd So now I found that out and I'm happy that it works, but it was quite hard to track it down, so I was wondering if this is the most intuitive way to add a column to a SchemaRDD using mapPartitions (as opposed to using a UDF, where the conversion "case class -> Row" seems to happen automatically). Or, even if there is no more intuitive way, just wanted to have this documented ;-) Thanks Tobias