Re: Spark 1.6 Streaming with Checkpointing
On Fri, Aug 26, 2016 at 10:54 PM, Benjamin Kimwrote: > // Create a text file stream on an S3 bucket > val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/") > > csv.foreachRDD(rdd => { > if (!rdd.partitions.isEmpty) { > // process data > } > }) Hi Benjamin, I hardly remember the case now but I'd recommend to move the above snippet *after* you getOrCreate context, i.e. > // Get streaming context from checkpoint data or create a new one > val context = StreamingContext.getOrCreate(checkpoint, > () => createContext(interval, checkpoint, bucket, database, > table, partitionBy)) Please the first snippet here so you only create the pipeline after you get context. I might be mistaken, too. Sorry. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark 1.6 Streaming with Checkpointing
I am trying to implement checkpointing in my streaming application but I am getting a not serializable error. Has anyone encountered this? I am deploying this job in YARN clustered mode. Here is a snippet of the main parts of the code. object S3EventIngestion { //create and setup streaming context def createContext( batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: String, databaseName: String, tableName: String, partitionByColumnName: String ): StreamingContext = { println("Creating new context") val sparkConf = new SparkConf().setAppName("S3EventIngestion") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) // Create the streaming context with batch interval val ssc = new StreamingContext(sc, Seconds(batchInterval)) // Create a text file stream on an S3 bucket val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/") csv.foreachRDD(rdd => { if (!rdd.partitions.isEmpty) { // process data } }) ssc.checkpoint(checkpointDirectory) ssc } def main(args: Array[String]) { if (args.length != 6) { System.err.println("Usage: S3EventIngestion ") System.exit(1) } // Get streaming context from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpoint, () => createContext(interval, checkpoint, bucket, database, table, partitionBy)) //start streaming context context.start() context.awaitTermination() } } Can someone help please? Thanks, Ben - To unsubscribe e-mail: user-unsubscr...@spark.apache.org