Below is code I have written. I am getting NotSerializableException. How can I handle this scenario?
kafkaStream.foreachRDD(rdd => {
println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(
record => {
//Write for CSV.
if (true == true) {
val structType = table.schema
val csvFile = ssc.sparkContext.textFile(record.toString())
val rowRDD = csvFile.map(x =>
getMappedRowFromCsvRecord(structType, x))
}
})
--
Regards,
Nishant
