Re: Spark 1.6 Streaming with Checkpointing

2016-08-26 Thread Jacek Laskowski
On Fri, Aug 26, 2016 at 10:54 PM, Benjamin Kim  wrote:

> // Create a text file stream on an S3 bucket
> val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")
>
> csv.foreachRDD(rdd => {
> if (!rdd.partitions.isEmpty) {
> // process data
> }
> })

Hi Benjamin,

I hardly remember the case now but I'd recommend to move the above
snippet *after* you getOrCreate context, i.e.

> // Get streaming context from checkpoint data or create a new one
> val context = StreamingContext.getOrCreate(checkpoint,
> () => createContext(interval, checkpoint, bucket, database, 
> table, partitionBy))

Please the first snippet here so you only create the pipeline after
you get context.

I might be mistaken, too. Sorry.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 1.6 Streaming with Checkpointing

2016-08-26 Thread Benjamin Kim
I am trying to implement checkpointing in my streaming application but I am 
getting a not serializable error. Has anyone encountered this? I am deploying 
this job in YARN clustered mode.

Here is a snippet of the main parts of the code.

object S3EventIngestion {
//create and setup streaming context
def createContext(
batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: 
String, databaseName: String, tableName: String, partitionByColumnName: String
): StreamingContext = {

println("Creating new context")
val sparkConf = new SparkConf().setAppName("S3EventIngestion")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// Create the streaming context with batch interval
val ssc = new StreamingContext(sc, Seconds(batchInterval))

// Create a text file stream on an S3 bucket
val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")

csv.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
// process data
}
})

ssc.checkpoint(checkpointDirectory)
ssc
}

def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage: S3EventIngestion  

")
System.exit(1)
}

// Get streaming context from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpoint,
() => createContext(interval, checkpoint, bucket, database, table, 
partitionBy))

//start streaming context
context.start()
context.awaitTermination()
}
}

Can someone help please?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org