Hi All, I am having some trouble trying to write generic code that uses sqlContext and RDDs. Can you suggest what might be wrong?
class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor: (String) => (T) ) { private[this] var location:Option[String] =None private[this] var name:Option[String]=None private[this] val sc = sqlContext.sparkContext ... def makeRDD(sqlQuery:String):SchemaRDD={ require(this.location!=None) require(this.name!=None) import sqlContext._ val rdd:RDD[String] = sc.textFile(this.location.get) val rddT:RDD[T] = rdd.map(extractor) val schemaRDD:SchemaRDD= createSchemaRDD(rddT) schemaRDD.registerAsTable(name.get) val all = sqlContext.sql(sqlQuery) all } } I use it as below: def extractor(line:String):POJO={ val splits= line.split(pattern).toList POJO(splits(0),splits(1),splits(2),splits(3)) } val pojoTable:SparkTable[POJO] = new SparkTable[POJO](sqlContext,extractor) val identityData:SchemaRDD= pojoTable.atLocation("hdfs://location/table") .withName("pojo") .makeRDD("SELECT * FROM pojo") I get compilation failure inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A <: Product] [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT) [error] ^ [error] SparkTable.scala:37: type mismatch; [error] found : org.apache.spark.rdd.RDD[T] [error] required: org.apache.spark.rdd.RDD[A] [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT) [error] ^ [error] two errors found I am probably missing something basic either in scala reflection/types or implicits? Any hints would be appreciated. Thanks Amit