I am getting the following error when I kill the spark driver and restart
the job:

15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
> file
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
> 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
> file
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
> java.io.IOException: java.lang.ClassNotFoundException:
> com.example.spark.streaming.reporting.live.jobs.Bucket
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
> at
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)



Spark version is 1.2.0

The streaming job is executing every 10 seconds with the following steps:

   1. Consuming JSON from a kafka topic called journeys and converting to
   case classes
   2. Filters resulting journeys stream based on a time attribute being set
   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
   e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000, hyperLogLog(journey
   id), 360) )
   4. ReduceByKey adding hyperloglogs
   5. UpdateStateByKey to add to previous states hyperloglog
   6. Then output results to Cassandra


I have made a sample app below to mimic the problem and put all classes
into one file.

To get around the issue for the moment, I have removed the Bucket class and
stopped passing in a bucket array to the ActiveJourney class.
And instead I hard code all the time buckets I need in the ActiveJourney
class; this approach works and recovers from checkpointing but is not
extensible.

Can the Spark gurus explain why I get that ClassNotFound exception?

Need any more information, please let me know.

Much thanks,
Conor


package com.example.spark.streaming.reporting.live.jobs
> import java.util.Date
> import scala.Array.canBuildFrom
> import scala.collection.mutable.MutableList
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.Seconds
> import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
> import org.apache.spark.streaming.dstream.DStream
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.json4s.DefaultFormats
> import org.json4s.jackson.JsonMethods.parse
> import org.json4s.jvalue2extractable
> import org.json4s.string2JsonInput
> import com.example.spark.streaming.utils.MilliSecondUtils
> import com.example.spark.streaming.utils.constants.ColumnFamilies
> import com.example.spark.streaming.utils.constants.Constants
> import com.example.spark.streaming.utils.constants.Milliseconds
> import com.example.spark.streaming.utils.constants.SparkConfig
> import com.datastax.spark.connector.SomeColumns
> import com.datastax.spark.connector.streaming.toDStreamFunctions
> import com.datastax.spark.connector.toNamedColumnRef
> import com.twitter.algebird.HLL
> import com.twitter.algebird.HyperLogLogMonoid

// Json parsing classes
> case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
> case class JourneyDetails(_id: String)
> case class JourneyCommand($set: Option[JourneySet])
> case class JourneySet(awayAt: Date)
> case class Bucket(val bucketType: String, val roundDown: (Long) => Long,
> val columnFamily: String, val size: Long, val maxIntervals: Int)
>
   case class ActiveState(var bucketType: String, var time: Long, var
hyperLogLog: HLL, var ttl: Int)

> object SampleJob {
>   private final val Name = this.getClass().getSimpleName()
>   def main(args: Array[String]) {
>     if (args.length < 8) {
>       System.err.println(s"Usage: $Name <enviroment> <zkQuorum> <group>
> <topics> <numThreads> <hdfsUri> <cassandra> <intervalSeconds>")
>       System.exit(1)
>     }
>     System.out.print(args)
>     val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
> cassandra, intervalSeconds) = args
>     val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass().
> getPackage().getImplementationVersion()
>     def functionToCreateContext(): StreamingContext = {
>
>       // how many buckets
>       val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils.
> roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
> FifteenMinutes, 90)
>       val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour,
> ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
>       val day = Bucket("DAY", MilliSecondUtils.roundDownToNearestDay,
> ColumnFamilies.Visits_1440, Milliseconds.Day, 8640)
>       val activeJourneys = new ActiveJourney(Array(fifteen,hour,day))
>       val sparkConf = new SparkConf()
>         .setAppName(Name)
>         .set(SparkConfig.SparkMesosCoarse, Constants.True)
>         .set(SparkConfig.SparkCleanerTtl, "300")
>         .set(SparkConfig.SparkDriverMemory, "128m")
>         .set(SparkConfig.SparkExecutorMemory, "128m")
>         .set(SparkConfig.SparkDriverMaxResultSize, "128m")
>         .set(SparkConfig.SparkDefaultParallelism, "3")
>         .set(SparkConfig.SparkCoresMax, "2")
>         .set(SparkConfig.SparkStreamingUnpersist, Constants.True)
>         .set(SparkConfig.SparkStreamingBlockInterval, "5000")
>         .set(SparkConfig.SparkCassandraConnectionHost, cassandra)
>       val scc = new StreamingContext(sparkConf, Seconds(intervalSeconds
> .toInt))
>       scc.checkpoint(checkpointDirectory)
>       val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>       val lines = KafkaUtils.createStream(scc, zkQuorum, group, topicMap).
> map(_._2)
>       // convert from Json to Scala classes
>       val journeys = lines.map(line => {
>         // lossless is used to extract milliseconds
>         implicit val formats = DefaultFormats.lossless
>         // 'parse' function comes from the json4s lib
>         parse(line).extract[Journey]
>       })
>       activeJourneys.process(journeys)
>       scc
>     }
>     val scc = StreamingContext.getOrCreate(checkpointDirectory,
> functionToCreateContext _)
>     sys.ShutdownHookThread {
>       System.err.println(s"Gracefully stopping $Name Spark Streaming
> Application")
>       scc.stop(stopSparkContext = true, stopGracefully = true)
>       System.err.println(s"$Name streaming job stopped")
>     }
>     scc.start()
>     scc.awaitTermination()
>   }
> }
> class ActiveJourney(val buckets: Array[Bucket]) extends Serializable {
>   val BIT_SIZE = 12
>   val hll = new HyperLogLogMonoid(BIT_SIZE)
>
>   def process(journeys: DStream[Journey]) {
>     action(transform(journeys.filter(this.filter)))
>   }
>   def filter(journey: Journey): Boolean = {
>     val awayAt = journey.o.getOrElse(new JourneyCommand(Some(new
> JourneySet(null)))).$set.getOrElse(new JourneySet(null)).awayAt
>     val isSettingAwayAt = awayAt != null
>     isSettingAwayAt
>   }
>   def transform(stream: DStream[Journey]): DStream[ActiveState] = {
>     val bucketCounts = stream.flatMap(bucketize(_)).reduceByKey((x, y) =>
> {
>       x.hyperLogLog += y.hyperLogLog
>       x
>     }).updateStateByKey(updateFunction).map(_._2)
>     bucketCounts
>   }
>   def action(stream: DStream[ActiveState]) = {
>     buckets.foreach(bucket => {
>       val filteredActiveJourneys = stream.filter(_.bucketType == bucket.
> bucketType)
>       val cassandraFormat = filteredActiveJourneys.map(total => ("website",
> new Date(total.time), "website", total.hyperLogLog.estimatedSize.toInt));
>       cassandraFormat.saveToCassandra("reporting", bucket.columnFamily,
> SomeColumns("key", "ts", "key_type", "total"))
>     })
>   }
>   def updateFunction(journeys: Seq[ActiveState], state: Option[ActiveState])
> = {
>     val currentState = state.getOrElse(journeys(0))
>     journeys.foreach(x => {
>       currentState.hyperLogLog += x.hyperLogLog
>     })
>     currentState.ttl -= 1
>     if (currentState.ttl < 0) {
>       None
>     } else {
>       Some(currentState)
>     }
>   }
>   def bucketize(journey: Journey): MutableList[(String, ActiveState)] = {
>     var timeBuckets = MutableList.empty[(String, ActiveState)]
>     buckets.foreach(bucket => {
>       var timeBucket = bucket.roundDown(journey.o.get.$set.get.awayAt.
> getTime())
>       var journeyState = (bucket.bucketType + timeBucket, ActiveState(
> bucket.bucketType, timeBucket, hll(journey.o2.get._id.getBytes()), bucket.
> maxIntervals))
>       timeBuckets += journeyState
>     })
>     timeBuckets
>   }
> }
>

Reply via email to