Could you come up with a minimal example through which I can reproduce the
problem?

On Tue, Feb 10, 2015 at 12:30 PM, conor <fennell.co...@gmail.com> wrote:

> I am getting the following error when I kill the spark driver and restart
> the job:
>
> 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
> file
>
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
> 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
> file
>
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
> java.io.IOException: java.lang.ClassNotFoundException:
> com.example.spark.streaming.reporting.live.jobs.Bucket
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
> at
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
>
>
>
> Spark version is 1.2.0
>
> The streaming job is executing every 10 seconds with the following steps:
>
>   1. Consuming JSON from a kafka topic called journeys and converting to
>   case classes
>   2. Filters resulting journeys stream based on a time attribute being set
>   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
>   e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000,
> hyperLogLog(journey
>   id), 360) )
>   4. ReduceByKey adding hyperloglogs
>   5. UpdateStateByKey to add to previous states hyperloglog
>   6. Then output results to Cassandra
>
>
> I have pasted in a sample app below to mimic the problem and put all
> classes
> into one file, it is also attached here  SampleJob.scala
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala
> >
>
> To get around the issue for the moment, I have removed the Bucket class and
> stopped passing in a bucket array to the ActiveJourney class.
> And instead I hard code all the time buckets I need in the ActiveJourney
> class; this approach works and recovers from checkpointing but is not
> extensible.
>
> Can the Spark gurus explain why I get that ClassNotFound exception?
>
> Need any more information, please let me know.
>
> Much thanks,
> Conor
>
>
>
> package com.example.spark.streaming.reporting.live.jobs
> import java.util.Date
> import scala.Array.canBuildFrom
> import scala.collection.mutable.MutableList
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.Seconds
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
> import org.apache.spark.streaming.dstream.DStream
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.json4s.DefaultFormats
> import org.json4s.jackson.JsonMethods.parse
> import org.json4s.jvalue2extractable
> import org.json4s.string2JsonInput
> import com.example.spark.streaming.utils.MilliSecondUtils
> import com.example.spark.streaming.utils.constants.ColumnFamilies
> import com.example.spark.streaming.utils.constants.Constants
> import com.example.spark.streaming.utils.constants.Milliseconds
> import com.example.spark.streaming.utils.constants.SparkConfig
> import com.datastax.spark.connector.SomeColumns
> import com.datastax.spark.connector.streaming.toDStreamFunctions
> import com.datastax.spark.connector.toNamedColumnRef
> import com.twitter.algebird.HLL
> import com.twitter.algebird.HyperLogLogMonoid
> // Json parsing classes
> case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
> case class JourneyDetails(_id: String)
> case class JourneyCommand($set: Option[JourneySet])
> case class JourneySet(awayAt: Date)
> // Class not found bucket
> case class Bucket(val bucketType: String, val roundDown: (Long) => Long,
> val
> columnFamily: String, val size: Long, val maxIntervals: Int)
>
> // used for updateStateByKey
> case class ActiveState(var bucketType: String, var time: Long, var
> hyperLogLog: HLL, var ttl: Int)
>
> object SampleJob {
>  private final val Name = this.getClass().getSimpleName()
>  def main(args: Array[String]) {
>    if (args.length < 8) {
>      System.err.println(s"Usage: $Name <enviroment> <zkQuorum> <group>
> <topics> <numThreads> <hdfsUri> <cassandra> <intervalSeconds>")
>      System.exit(1)
>    }
>    System.out.print(args)
>    val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
> cassandra, intervalSeconds) = args
>    val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass().
> getPackage().getImplementationVersion()
>    def functionToCreateContext(): StreamingContext = {
>
>      // how many buckets
>      val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils.
> roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
> FifteenMinutes, 90)
>      val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour,
> ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
>      val day = Bucket("DAY", MilliSecondUtils.roundDownToNearestDay,
> ColumnFamilies.Visits_1440, Milliseconds.Day, 8640)
>      val activeJourneys = new ActiveJourney(Array(fifteen,hour,day))
>      val sparkConf = new SparkConf()
>        .setAppName(Name)
>        .set(SparkConfig.SparkMesosCoarse, Constants.True)
>        .set(SparkConfig.SparkCleanerTtl, "300")
>        .set(SparkConfig.SparkDriverMemory, "128m")
>        .set(SparkConfig.SparkExecutorMemory, "128m")
>        .set(SparkConfig.SparkDriverMaxResultSize, "128m")
>        .set(SparkConfig.SparkDefaultParallelism, "3")
>        .set(SparkConfig.SparkCoresMax, "2")
>        .set(SparkConfig.SparkStreamingUnpersist, Constants.True)
>        .set(SparkConfig.SparkStreamingBlockInterval, "5000")
>        .set(SparkConfig.SparkCassandraConnectionHost, cassandra)
>      val scc = new StreamingContext(sparkConf, Seconds(intervalSeconds
> .toInt))
>      scc.checkpoint(checkpointDirectory)
>      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>      val lines = KafkaUtils.createStream(scc, zkQuorum, group, topicMap).
> map(_._2)
>      // convert from Json to Scala classes
>      val journeys = lines.map(line => {
>        // lossless is used to extract milliseconds
>        implicit val formats = DefaultFormats.lossless
>        // 'parse' function comes from the json4s lib
>        parse(line).extract[Journey]
>      })
>      activeJourneys.process(journeys)
>      scc
>    }
>    val scc = StreamingContext.getOrCreate(checkpointDirectory,
> functionToCreateContext _)
>    sys.ShutdownHookThread {
>      System.err.println(s"Gracefully stopping $Name Spark Streaming
> Application")
>      scc.stop(stopSparkContext = true, stopGracefully = true)
>      System.err.println(s"$Name streaming job stopped")
>    }
>    scc.start()
>    scc.awaitTermination()
>  }
> }
> class ActiveJourney(val buckets: Array[Bucket]) extends Serializable {
>  val BIT_SIZE = 12
>  val hll = new HyperLogLogMonoid(BIT_SIZE)
>
>  def process(journeys: DStream[Journey]) {
>    action(transform(journeys.filter(this.filter)))
>  }
>  def filter(journey: Journey): Boolean = {
>    val awayAt = journey.o.getOrElse(new JourneyCommand(Some(new JourneySet(
> null)))).$set.getOrElse(new JourneySet(null)).awayAt
>    val isSettingAwayAt = awayAt != null
>    isSettingAwayAt
>  }
>  def transform(stream: DStream[Journey]): DStream[ActiveState] = {
>    val bucketCounts = stream.flatMap(bucketize(_)).reduceByKey((x, y) => {
>      x.hyperLogLog += y.hyperLogLog
>      x
>    }).updateStateByKey(updateFunction).map(_._2)
>    bucketCounts
>  }
>  def action(stream: DStream[ActiveState]) = {
>    buckets.foreach(bucket => {
>      val filteredActiveJourneys = stream.filter(_.bucketType == bucket.
> bucketType)
>      val cassandraFormat = filteredActiveJourneys.map(total => ("website",
> new Date(total.time), "website", total.hyperLogLog.estimatedSize.toInt));
>      cassandraFormat.saveToCassandra("reporting", bucket.columnFamily,
> SomeColumns("key", "ts", "key_type", "total"))
>    })
>  }
>  def updateFunction(journeys: Seq[ActiveState], state: Option[ActiveState])
> = {
>    val currentState = state.getOrElse(journeys(0))
>    journeys.foreach(x => {
>      currentState.hyperLogLog += x.hyperLogLog
>    })
>    currentState.ttl -= 1
>    if (currentState.ttl < 0) {
>      None
>    } else {
>      Some(currentState)
>    }
>  }
>  def bucketize(journey: Journey): MutableList[(String, ActiveState)] = {
>    var timeBuckets = MutableList.empty[(String, ActiveState)]
>    buckets.foreach(bucket => {
>      var timeBucket = bucket.roundDown(journey.o.get.$set.get.awayAt.
> getTime())
>      var journeyState = (bucket.bucketType + timeBucket, ActiveState(bucket
> .bucketType, timeBucket, hll(journey.o2.get._id.getBytes()), bucket.
> maxIntervals))
>      timeBuckets += journeyState
>    })
>    timeBuckets
>  }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-throwing-ClassNotFound-exception-when-recovering-from-checkpointing-tp21582.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to