Replace val sqlContext = new SQLContext(sparkContext) with @transient val sqlContext = new SQLContext(sparkContext)
-----Original Message----- From: kpeng1 [mailto:kpe...@gmail.com] Sent: 04 March 2015 23:39 To: user@spark.apache.org Subject: Passing around SparkContext with in the Driver Hi All, I am trying to create a class that wraps functionalities that I need; some of these functions require access to the SparkContext, which I would like to pass in. I know that the SparkContext is not seralizable, and I am not planning on passing it to worker nodes or anything, I just want to wrap some functionalities that require SparkContext's api. As a preface, I am basically using the spark shell to test the functionality of my code at the moment, so I am not sure if that plays into any of the issues I am having. Here is my current class: class MyClass(sparkContext: SparkContext) { import org.apache.spark.sql._ import org.apache.spark.rdd._ val sqlContext = new SQLContext(sparkContext) val DATA_TYPE_MAPPING = Map( "int" -> IntegerType, "double" -> DoubleType, "float" -> FloatType, "long" -> LongType, "short" -> ShortType, "binary" -> BinaryType, "bool" -> BooleanType, "byte" -> ByteType, "string" -> StringType) //removes the first line of a text file def removeHeader(partitionIdx: Int, fileItr: Iterator[String]): Iterator[String] ={ //header line is first line in first partition if(partitionIdx == 0){ fileItr.drop(1) } fileItr } //returns back a StructType for the schema def getSchema(rawSchema: Array[String]): StructType ={ //return backs a StructField def getSchemaFieldHelper(schemaField: String): StructField ={ val schemaParts = schemaField.split(' ') StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true) } val structFields = rawSchema.map(column => getSchemaFieldHelper(column)) StructType(structFields) } def getRow(strRow: String): Row ={ val spRow = strRow.split(',') val tRow = spRow.map(_.trim) Row(tRow:_*) } def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile: String): SchemaRDD ={ //apply schema to rdd to create schemaRDD def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType): SchemaRDD ={ val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct) schemaRDD } val rawSchema = sparkContext.textFile(schemaFile).collect val schema = getSchema(rawSchema) val rawCsvData = sparkContext.textFile(csvFile) //if we want to keep header from csv file if(includeHeader){ val rowRDD = rawCsvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) return schemaRDD } val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader) val rowRDD = csvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) schemaRDD } } So in the spark shell I am basically creating an instance of this class and calling applySchemaToCsv like so: val test = new MyClass(sc) test.applySchemaToCsv("/tmp/myFile.csv", false, "/tmp/schema.txt") What I am getting is not serializable exception: 15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at <console>:62 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615) . . . Caused by: java.io.NotSerializableException: If I remove the class wrapper and make references to sc directly everything works. I am basically wondering what is causing the serialization issues and if I can wrap a class around these functions. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org