I am trying to implement checkpointing in my streaming application but I am getting a not serializable error. Has anyone encountered this? I am deploying this job in YARN clustered mode.
Here is a snippet of the main parts of the code. object S3EventIngestion { //create and setup streaming context def createContext( batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: String, databaseName: String, tableName: String, partitionByColumnName: String ): StreamingContext = { println("Creating new context") val sparkConf = new SparkConf().setAppName("S3EventIngestion") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) // Create the streaming context with batch interval val ssc = new StreamingContext(sc, Seconds(batchInterval)) // Create a text file stream on an S3 bucket val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/") csv.foreachRDD(rdd => { if (!rdd.partitions.isEmpty) { // process data } }) ssc.checkpoint(checkpointDirectory) ssc } def main(args: Array[String]) { if (args.length != 6) { System.err.println("Usage: S3EventIngestion <batch-interval> <checkpoint-directory> <s3-bucket-name> <database-name> <table-name> <partition-by-column-name>") System.exit(1) } // Get streaming context from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpoint, () => createContext(interval, checkpoint, bucket, database, table, partitionBy)) //start streaming context context.start() context.awaitTermination() } } Can someone help please? Thanks, Ben --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org