[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901609#comment-14901609
 ] 

Jon Buffington commented on SPARK-5569:
---------------------------------------

Cody,

We were able to work around the issue by destructuring the OffsetRange type 
into its parts (i.e., string, long, ints). Below is our flow for reference. Our 
only concern is whether the transform operation runs at the driver as we have 
use a singleton on the driver to persist the last captured offsets. Can you 
confirm that the stream transform runs at the driver?

{noformat}
// Recover or create the stream.
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
  createStreamingContext(checkpointPath)
})
...
def createStreamingContext(checkpointPath: String): StreamingContext = {
  val ssc = new StreamingContext(SparkConf,  Minutes(1))
  ssc.checkpoint(checkpointPath)
  createStream(ssc)
  ssc
}
...

def createStream((ssc: StreamingContext): Unit = {
...
  KafkaUtils.createDirectStream[K, V, KD, VD, R](ssc, kafkaParams, fromOffsets, 
messageHandler)
    // "Note that the typecast to HasOffsetRanges will only succeed if it is
    // done in the first method called on the directKafkaStream, not later down
    // a chain of methods. You can use transform() instead of foreachRDD() as
    // your first method call in order to access offsets, then call further
    // Spark methods. However, be aware that the one-to-one mapping between RDD
    // partition and Kafka partition does not remain after any methods that
    // shuffle or repartition, e.g. reduceByKey() or window()."
    .transform { rdd =>
      ...
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      Ranger.captureOffsetRanges(offsetRanges) // De-structure the OffsetRange 
type into its parts and save in a singleton.
      ...
      rdd
    }
    ...
}
{noformat}


> Checkpoints cannot reference classes defined outside of Spark's assembly
> ------------------------------------------------------------------------
>
>                 Key: SPARK-5569
>                 URL: https://issues.apache.org/jira/browse/SPARK-5569
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-1421100410000.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-1421100410000.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
>         at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
>         at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at 
> org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239)
>         at 
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)
>         at example.CheckpointedExample$.main(CheckpointedExample.scala:34)
>         at example.CheckpointedExample.main(CheckpointedExample.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:274)
>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>         at 
> org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
>         at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at 
> scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at 
> scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
>         at scala.collection.mutable.HashMap.init(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         ... 52 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to