Re: Serialization of objects
If you want to stick with Java serialization and need to serialize a non-Serializable object, your best choices are probably to either subclass it with a Serializable one or wrap it in a class of your own which implements its own writeObject/readObject methods (see here: http://stackoverflow.com/questions/6163872/how-to-serialize-a-non-serializable-in-java ) Otherwise you can use Kryo to register custom serializers for other people's objects. On Mon, Jun 30, 2014 at 1:52 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was able to solve this issue. For now I changed the library code and added the following to the class com.wcohen.ss.BasicStringWrapper: public class BasicStringWrapper implements Serializable However, I am still curious to know ho to get around the issue when you don't have access to the code and you are using a 3rd party jar. -- From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: Serialization of objects Date: Thu, 26 Jun 2014 09:30:31 -0700 Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper; import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd; import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) } } class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence) var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x))) val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName) i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents 14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12) 14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms 14/06/25 12:32:05 INFO Executor: Running task ID 0 14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 1403724701564 14/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp 14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader 14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 1403724701562 14/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp 14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader 14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally 14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+140 14/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 717 14/06
RE: Serialization of objects
Hi everyone,I was able to solve this issue. For now I changed the library code and added the following to the class com.wcohen.ss.BasicStringWrapper: public class BasicStringWrapper implements Serializable However, I am still curious to know ho to get around the issue when you don't have access to the code and you are using a 3rd party jar. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: Serialization of objects Date: Thu, 26 Jun 2014 09:30:31 -0700 Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf import org.apache.spark.rdd;import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) }} class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName)i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25 12:32
Serialization of objects
Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf import org.apache.spark.rdd;import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) }} class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName)i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at JaccardScore.scala:5214/06/25 12:32:06 INFO DAGScheduler: Got job 1 (saveAsTextFile at JaccardScore.scala:52) with 2 output partitions (allowLocal=false)14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile at JaccardScore.scala:52)14/06/25 12:32:06 INFO DAGScheduler: Parents of final stage: List()14/06/25 12:32:06 INFO DAGScheduler: Missing parents: List()14/06/25 12:32:06 INFO DAGScheduler: