Hi,

I have a SchemaRDD where I want to add a column with a value that is
computed from the rest of the row. As the computation involves a
network operation and requires setup code, I can't use
  "SELECT *, myUDF(*) FROM rdd",
but I wanted to use a combination of:

 - get schema of input SchemaRDD
 - issue a mapPartitions call (including the setup code), obtaining a
   new RDD[Row]
 - extend the schema manually
 - create a new RDD by combining the RDD[Row] with the extended
   schema.

This works very well, but I run into trouble querying that resulting
SchemaRDD with SQL if:

 - the result of my computation is a case class
 - and I want to use values in this case class in the SQL query.

In particular, while

  SELECT column FROM resultrdd

works well,

  SELECT column.key_name FROM resultrdd

gives a

  java.lang.ClassCastException: example.MyCaseClass cannot be cast to
org.apache.spark.sql.catalyst.expressions.Row

Here is an example to illustrate that:

-----------------------------------

import org.apache.spark._import org.apache.spark.sql._import
org.apache.spark.sql.catalyst.types._

val sc = new SparkContext("local[3]", "Test")
val sqlc = new SQLContext(sc)import sqlc._
// this is the case class that my operation is returningcase class
Result(string_values: Map[String, String],
num_values: Map[String, Double])// dummy result dataval data =
(Result(Map("team" -> "a"), Map("score" -> 0.8)) ::
Result(Map("team" -> "b"), Map("score" -> 0.5)) :: Nil)val rdd =
sc.parallelize(data)// simulate my computation by creating an RDD[Row]
and creating// a schema programmaticallyval rowRdd = rdd.map(dr =>
Row.fromSeq(7 :: dr :: Nil))val progSchema =
StructType(StructField("hello", IntegerType, false) ::
           StructField("newcol", rdd.schema, true) :: Nil)val progRdd
= sqlc.applySchema(rowRdd,
progSchema)progRdd.registerTempTable("progrdd")// the following call
will *fail* with a ClassCastExceptionsqlc.sql("SELECT
newcol.string_values['team'] FROM progrdd").foreach(println)//
however, the schema I specified is correct. see how embedding// my
result in a proper case class works:case class ResultContainer(hello:
Int, newcol: Result)val caseClassRdd = rdd.map(dr =>
ResultContainer(7,
dr))caseClassRdd.registerTempTable("caseclassrdd")// the following
call will *work*sqlc.sql("SELECT newcol.string_values['team'] FROM
caseclassrdd").foreach(println)// even though the schema for both RDDs
is the same:progRdd.schema == caseClassRdd.schema


-----------------------------------

It turns out that I cannot use the case class directly, but I have to
convert it to a Row as well. That is, instead of

  val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))

I have to use

  val rowRdd = rdd.map(dr => Row.fromSeq(7 ::
Row.fromSeq(dr.productIterator.toSeq) :: Nil))

and then, I can use

  SELECT newcol.string_values['team'] FROM progrdd

So now I found that out and I'm happy that it works, but it was quite
hard to track it down, so I was wondering if this is the most
intuitive way to add a column to a SchemaRDD using mapPartitions (as
opposed to using a UDF, where the conversion "case class -> Row"
seems to happen automatically).

Or, even if there is no more intuitive way, just wanted to have this
documented ;-)

Thanks
Tobias

Reply via email to