Could you come up with a minimal example through which I can reproduce the problem?
On Tue, Feb 10, 2015 at 12:30 PM, conor <fennell.co...@gmail.com> wrote: > I am getting the following error when I kill the spark driver and restart > the job: > > 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from > file > > hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk > 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from > file > > hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk > java.io.IOException: java.lang.ClassNotFoundException: > com.example.spark.streaming.reporting.live.jobs.Bucket > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) > at > org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > > > > Spark version is 1.2.0 > > The streaming job is executing every 10 seconds with the following steps: > > 1. Consuming JSON from a kafka topic called journeys and converting to > case classes > 2. Filters resulting journeys stream based on a time attribute being set > 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes > e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000, > hyperLogLog(journey > id), 360) ) > 4. ReduceByKey adding hyperloglogs > 5. UpdateStateByKey to add to previous states hyperloglog > 6. Then output results to Cassandra > > > I have pasted in a sample app below to mimic the problem and put all > classes > into one file, it is also attached here SampleJob.scala > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala > > > > To get around the issue for the moment, I have removed the Bucket class and > stopped passing in a bucket array to the ActiveJourney class. > And instead I hard code all the time buckets I need in the ActiveJourney > class; this approach works and recovers from checkpointing but is not > extensible. > > Can the Spark gurus explain why I get that ClassNotFound exception? > > Need any more information, please let me know. > > Much thanks, > Conor > > > > package com.example.spark.streaming.reporting.live.jobs > import java.util.Date > import scala.Array.canBuildFrom > import scala.collection.mutable.MutableList > import org.apache.spark.SparkConf > import org.apache.spark.streaming.Seconds > import org.apache.spark.streaming.StreamingContext > import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions > import org.apache.spark.streaming.dstream.DStream > import org.apache.spark.streaming.kafka.KafkaUtils > import org.json4s.DefaultFormats > import org.json4s.jackson.JsonMethods.parse > import org.json4s.jvalue2extractable > import org.json4s.string2JsonInput > import com.example.spark.streaming.utils.MilliSecondUtils > import com.example.spark.streaming.utils.constants.ColumnFamilies > import com.example.spark.streaming.utils.constants.Constants > import com.example.spark.streaming.utils.constants.Milliseconds > import com.example.spark.streaming.utils.constants.SparkConfig > import com.datastax.spark.connector.SomeColumns > import com.datastax.spark.connector.streaming.toDStreamFunctions > import com.datastax.spark.connector.toNamedColumnRef > import com.twitter.algebird.HLL > import com.twitter.algebird.HyperLogLogMonoid > // Json parsing classes > case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) > case class JourneyDetails(_id: String) > case class JourneyCommand($set: Option[JourneySet]) > case class JourneySet(awayAt: Date) > // Class not found bucket > case class Bucket(val bucketType: String, val roundDown: (Long) => Long, > val > columnFamily: String, val size: Long, val maxIntervals: Int) > > // used for updateStateByKey > case class ActiveState(var bucketType: String, var time: Long, var > hyperLogLog: HLL, var ttl: Int) > > object SampleJob { > private final val Name = this.getClass().getSimpleName() > def main(args: Array[String]) { > if (args.length < 8) { > System.err.println(s"Usage: $Name <enviroment> <zkQuorum> <group> > <topics> <numThreads> <hdfsUri> <cassandra> <intervalSeconds>") > System.exit(1) > } > System.out.print(args) > val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, > cassandra, intervalSeconds) = args > val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass(). > getPackage().getImplementationVersion() > def functionToCreateContext(): StreamingContext = { > > // how many buckets > val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils. > roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. > FifteenMinutes, 90) > val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour, > ColumnFamilies.Visits_60, Milliseconds.Hour, 360) > val day = Bucket("DAY", MilliSecondUtils.roundDownToNearestDay, > ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) > val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) > val sparkConf = new SparkConf() > .setAppName(Name) > .set(SparkConfig.SparkMesosCoarse, Constants.True) > .set(SparkConfig.SparkCleanerTtl, "300") > .set(SparkConfig.SparkDriverMemory, "128m") > .set(SparkConfig.SparkExecutorMemory, "128m") > .set(SparkConfig.SparkDriverMaxResultSize, "128m") > .set(SparkConfig.SparkDefaultParallelism, "3") > .set(SparkConfig.SparkCoresMax, "2") > .set(SparkConfig.SparkStreamingUnpersist, Constants.True) > .set(SparkConfig.SparkStreamingBlockInterval, "5000") > .set(SparkConfig.SparkCassandraConnectionHost, cassandra) > val scc = new StreamingContext(sparkConf, Seconds(intervalSeconds > .toInt)) > scc.checkpoint(checkpointDirectory) > val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap > val lines = KafkaUtils.createStream(scc, zkQuorum, group, topicMap). > map(_._2) > // convert from Json to Scala classes > val journeys = lines.map(line => { > // lossless is used to extract milliseconds > implicit val formats = DefaultFormats.lossless > // 'parse' function comes from the json4s lib > parse(line).extract[Journey] > }) > activeJourneys.process(journeys) > scc > } > val scc = StreamingContext.getOrCreate(checkpointDirectory, > functionToCreateContext _) > sys.ShutdownHookThread { > System.err.println(s"Gracefully stopping $Name Spark Streaming > Application") > scc.stop(stopSparkContext = true, stopGracefully = true) > System.err.println(s"$Name streaming job stopped") > } > scc.start() > scc.awaitTermination() > } > } > class ActiveJourney(val buckets: Array[Bucket]) extends Serializable { > val BIT_SIZE = 12 > val hll = new HyperLogLogMonoid(BIT_SIZE) > > def process(journeys: DStream[Journey]) { > action(transform(journeys.filter(this.filter))) > } > def filter(journey: Journey): Boolean = { > val awayAt = journey.o.getOrElse(new JourneyCommand(Some(new JourneySet( > null)))).$set.getOrElse(new JourneySet(null)).awayAt > val isSettingAwayAt = awayAt != null > isSettingAwayAt > } > def transform(stream: DStream[Journey]): DStream[ActiveState] = { > val bucketCounts = stream.flatMap(bucketize(_)).reduceByKey((x, y) => { > x.hyperLogLog += y.hyperLogLog > x > }).updateStateByKey(updateFunction).map(_._2) > bucketCounts > } > def action(stream: DStream[ActiveState]) = { > buckets.foreach(bucket => { > val filteredActiveJourneys = stream.filter(_.bucketType == bucket. > bucketType) > val cassandraFormat = filteredActiveJourneys.map(total => ("website", > new Date(total.time), "website", total.hyperLogLog.estimatedSize.toInt)); > cassandraFormat.saveToCassandra("reporting", bucket.columnFamily, > SomeColumns("key", "ts", "key_type", "total")) > }) > } > def updateFunction(journeys: Seq[ActiveState], state: Option[ActiveState]) > = { > val currentState = state.getOrElse(journeys(0)) > journeys.foreach(x => { > currentState.hyperLogLog += x.hyperLogLog > }) > currentState.ttl -= 1 > if (currentState.ttl < 0) { > None > } else { > Some(currentState) > } > } > def bucketize(journey: Journey): MutableList[(String, ActiveState)] = { > var timeBuckets = MutableList.empty[(String, ActiveState)] > buckets.foreach(bucket => { > var timeBucket = bucket.roundDown(journey.o.get.$set.get.awayAt. > getTime()) > var journeyState = (bucket.bucketType + timeBucket, ActiveState(bucket > .bucketType, timeBucket, hll(journey.o2.get._id.getBytes()), bucket. > maxIntervals)) > timeBuckets += journeyState > }) > timeBuckets > } > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-throwing-ClassNotFound-exception-when-recovering-from-checkpointing-tp21582.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >