Heh, this question keeps coming up. You can't use a context or RDD inside a distributed operation, only from the driver. Here you're trying to call textFile from within foreachPartition.
On Fri, Jan 23, 2015 at 10:59 AM, Nishant Patel <[email protected]> wrote: > Below is code I have written. I am getting NotSerializableException. How can > I handle this scenario? > > kafkaStream.foreachRDD(rdd => { > println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") > rdd.foreachPartition(partitionOfRecords => { > partitionOfRecords.foreach( > record => { > > //Write for CSV. > if (true == true) { > > val structType = table.schema > val csvFile = ssc.sparkContext.textFile(record.toString()) > > val rowRDD = csvFile.map(x => > getMappedRowFromCsvRecord(structType, x)) > > } > }) > > -- > Regards, > Nishant > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
