Hi All, I was just toying with creating a very rudimentary RDD datasource to understand the inner workings of RDDs.
It seems that one of the constructors for RDD has a parameter of type SparkContext, but it (apparently) exists on the driver only and is not serializable. Consequently, any attempt to use SparkContext parameter inside your custom RDD generates a runtime error of it not being serializable. Just wondering what is the rationale behind this? I.e. if it is not serializable/usable, why make it a parameter? And if it needs to be a parameter, why not make it serializable (is it even possible?) Below is my working code where I test a custom RDD. scala> val mydata = spark.read.format("MyDataSourceProvider").load() mydata: org.apache.spark.sql.DataFrame = [mydataStr: string] scala> mydata.show(10, false) +------------------------+ |mydataStr | +------------------------+ |Partition: 0, row 1 of 3| |Partition: 0, row 2 of 3| |Partition: 0, row 3 of 3| +------------------------+ scala> ///// custom RDD import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, TableScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD class MyDataSourceProvider extends DataSourceRegister with RelationProvider with Logging { override def shortName():String = { "mydata" } private val myDataSchema: StructType = new StructType(Array[StructField](new StructField("mydataStr", StringType, false))) def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { (shortName(), schema.get) } override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { new MyDataRelation(sqlContext, myDataSchema, parameters) } } class MyDataRelation(override val sqlContext: SQLContext, override val schema: StructType, params: Map[String, String]) extends BaseRelation with TableScan with Logging { override def buildScan(): org.apache.spark.rdd.RDD[Row] = { val rdd = new MyDataSourceRDD(sqlContext.sparkContext, sqlContext.getAllConfs) rdd } override def needConversion = true } class MyDataSourceRDD(sc: SparkContext, conf: Map[String, String]) extends RDD[Row](sc, Nil) { override def getPartitions: Array[org.apache.spark.Partition] = { // sc.getConf.getAll.foreach(println) - this fails with SparkContext not serialiable error. So what use is this parameter ?! val numPartitions = conf.getOrElse("spark.mydata.numpartitions", "1").toInt val rowsPerPartition = conf.getOrElse("spark.mydata.rowsperpartition", "3").toInt val partitions = 0 until numPartitions map(partition => new MyDataSourcePartition(partition,rowsPerPartition)) partitions.toArray } override def compute(split: Partition, context: TaskContext): Iterator[Row] = { val myDataSourcePartition = split.asInstanceOf[MyDataSourcePartition] val partitionId = myDataSourcePartition.index val rows = myDataSourcePartition.rowCount val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId}, row ${r} of ${rows}")) partitionData.iterator } } class MyDataSourcePartition(partitionId: Int, rows: Int) extends Partition with Serializable { override def index:Int = partitionId def rowCount: Int = rows