Hi All,

I am having some trouble trying to write generic code that uses sqlContext
and RDDs. Can you suggest what might be wrong?

 class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor:
(String) => (T) ) {

  private[this] var location:Option[String] =None
  private[this] var name:Option[String]=None
  private[this] val sc = sqlContext.sparkContext
  ...

def makeRDD(sqlQuery:String):SchemaRDD={
    require(this.location!=None)
    require(this.name!=None)
    import sqlContext._
    val rdd:RDD[String] = sc.textFile(this.location.get)
    val rddT:RDD[T] = rdd.map(extractor)
    val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
    schemaRDD.registerAsTable(name.get)
    val all = sqlContext.sql(sqlQuery)
    all
  }

}

I use it as below:

 def extractor(line:String):POJO={
      val splits= line.split(pattern).toList
      POJO(splits(0),splits(1),splits(2),splits(3))
    }

   val pojoTable:SparkTable[POJO] = new
SparkTable[POJO](sqlContext,extractor)

    val identityData:SchemaRDD=
    pojoTable.atLocation("hdfs://location/table")
      .withName("pojo")
      .makeRDD("SELECT * FROM pojo")


I get compilation failure

inferred type arguments [T] do not conform to method createSchemaRDD's type
parameter bounds [A <: Product]
[error]     val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
[error]                              ^
[error]  SparkTable.scala:37: type mismatch;
[error]  found   : org.apache.spark.rdd.RDD[T]
[error]  required: org.apache.spark.rdd.RDD[A]
[error]     val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
[error]                                              ^
[error] two errors found

I am probably missing something basic either in scala reflection/types or
implicits?

Any hints would be appreciated.

Thanks
Amit

Reply via email to