I am trying to implement checkpointing in my streaming application but I am 
getting a not serializable error. Has anyone encountered this? I am deploying 
this job in YARN clustered mode.

Here is a snippet of the main parts of the code.

object S3EventIngestion {
    //create and setup streaming context
    def createContext(
        batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: 
String, databaseName: String, tableName: String, partitionByColumnName: String
    ): StreamingContext = {

        println("Creating new context")
        val sparkConf = new SparkConf().setAppName("S3EventIngestion")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)

        // Create the streaming context with batch interval
        val ssc = new StreamingContext(sc, Seconds(batchInterval))
        
        // Create a text file stream on an S3 bucket
        val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")

        csv.foreachRDD(rdd => {
            if (!rdd.partitions.isEmpty) {
                // process data
            }
        })

        ssc.checkpoint(checkpointDirectory)
        ssc
    }

    def main(args: Array[String]) {
        if (args.length != 6) {
            System.err.println("Usage: S3EventIngestion <batch-interval> 
<checkpoint-directory> <s3-bucket-name> <database-name> <table-name> 
<partition-by-column-name>")
            System.exit(1)
        }

        // Get streaming context from checkpoint data or create a new one
        val context = StreamingContext.getOrCreate(checkpoint,
            () => createContext(interval, checkpoint, bucket, database, table, 
partitionBy))

        //start streaming context
        context.start()
        context.awaitTermination()
    }
}

Can someone help please?

Thanks,
Ben
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to