Re: Serialization of objects

2014-07-01 Thread Aaron Davidson
If you want to stick with Java serialization and need to serialize a
non-Serializable object, your best choices are probably to either subclass
it with a Serializable one or wrap it in a class of your own which
implements its own writeObject/readObject methods (see here:
http://stackoverflow.com/questions/6163872/how-to-serialize-a-non-serializable-in-java
)

Otherwise you can use Kryo to register custom serializers for other
people's objects.


On Mon, Jun 30, 2014 at 1:52 PM, Sameer Tilak ssti...@live.com wrote:

 Hi everyone,
 I was able to solve this issue. For now I changed the library code and
 added the following to the class com.wcohen.ss.BasicStringWrapper:

 public class BasicStringWrapper implements  Serializable

 However, I am still curious to know ho to get around the issue when you
 don't have access to the code and you are using a 3rd party jar.


 --
 From: ssti...@live.com
 To: u...@spark.incubator.apache.org
 Subject: Serialization of objects
 Date: Thu, 26 Jun 2014 09:30:31 -0700


 Hi everyone,

 Aaron, thanks for your help so far. I am trying to serialize objects that
 I instantiate from a 3rd party library namely instances of 
 com.wcohen.ss.Jaccard,
 and com.wcohen.ss.BasicStringWrapper. However, I am having problems with
 serialization. I am (at least trying to) using Kryo for serialization. I
  am still facing the serialization issue. I get 
 org.apache.spark.SparkException:
 Job aborted due to stage failure: Task not serializable:
 java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any
 help with this will be great.
 Scala code:

 package approxstrmatch

 import com.wcohen.ss.BasicStringWrapper;
 import com.wcohen.ss.Jaccard;

 import java.util.Iterator;

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.SparkConf

 import org.apache.spark.rdd;
 import org.apache.spark.rdd.RDD;

 import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.serializer.KryoRegistrator

 class MyRegistrator extends KryoRegistrator {
   override def registerClasses(kryo: Kryo) {
 kryo.register(classOf[approxstrmatch.JaccardScore])
 kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
 kryo.register(classOf[com.wcohen.ss.Jaccard])

   }
 }

  class JaccardScore  {

   val mjc = new Jaccard()  with Serializable
   val conf = new
 SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch)
   conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator)

   val sc = new SparkContext(conf)

   def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])
  {
   val jc_ = this.mjc

   var i: Int = 0
   for (sentence - sourcerdd.toLocalIterator)
{val str1 = new BasicStringWrapper (sentence)
 var scorevector = destrdd.map(x = jc_.score(str1, new
 BasicStringWrapper(x)))
 val fileName = new
 String(/apps/software/scala-approsstrmatch-sentence + i)
 scorevector.saveAsTextFile(fileName)
 i += 1
}

   }

 Here is the script:
  val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt);
  val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt);
  val score = new approxstrmatch.JaccardScore()
  score.calculateScoreSecond(srcFile, distFile)

 O/P:

 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
 textFile at console:12), which has no missing parents
 14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage
 0 (MappedRDD[3] at textFile at console:12)
 14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
 executor localhost: localhost (PROCESS_LOCAL)
 14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes
 in 4 ms
 14/06/25 12:32:05 INFO Executor: Running task ID 0
 14/06/25 12:32:05 INFO Executor: Fetching
 http://serverip:47417/jars/approxstrmatch.jar with timestamp 1403724701564
 14/06/25 12:32:05 INFO Utils: Fetching
 http://serverip:47417/jars/approxstrmatch.jar to
 /tmp/fetchFileTemp8194323811657370518.tmp
 14/06/25 12:32:05 INFO Executor: Adding
 file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to
 class loader
 14/06/25 12:32:05 INFO Executor: Fetching
 http://serverip:47417/jars/secondstring-20140618.jar with timestamp
 1403724701562
 14/06/25 12:32:05 INFO Utils: Fetching
 http://serverip:47417/jars/secondstring-20140618.jar to
 /tmp/fetchFileTemp8711755318201511766.tmp
 14/06/25 12:32:06 INFO Executor: Adding
 file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar
 to class loader
 14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally
 14/06/25 12:32:06 INFO HadoopRDD: Input split:
 hdfs://serverip:54310/data/dummy/test.txt:0+140
 14/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 717
 14/06

RE: Serialization of objects

2014-06-30 Thread Sameer Tilak
Hi everyone,I was able to solve this issue. For now I changed the library code 
and added the following to the class com.wcohen.ss.BasicStringWrapper: 
public class BasicStringWrapper implements  Serializable

However, I am still curious to know ho to get around the issue when you don't 
have access to the code and you are using a 3rd party jar.

From: ssti...@live.com
To: u...@spark.incubator.apache.org
Subject: Serialization of objects
Date: Thu, 26 Jun 2014 09:30:31 -0700




Hi everyone,
Aaron, thanks for your help so far. I am trying to serialize objects that I 
instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, 
and com.wcohen.ss.BasicStringWrapper. However, I am having problems with 
serialization. I am (at least trying to) using Kryo for serialization. I  am 
still facing the serialization issue. I get org.apache.spark.SparkException: 
Job aborted due to stage failure: Task not serializable: 
java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help 
with this will be great.  Scala code:
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;
import java.util.Iterator;
import org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._import org.apache.spark.SparkConf
import org.apache.spark.rdd;import org.apache.spark.rdd.RDD;
import com.esotericsoftware.kryo.Kryoimport 
org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {  override def 
registerClasses(kryo: Kryo) {
kryo.register(classOf[approxstrmatch.JaccardScore])
kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
kryo.register(classOf[com.wcohen.ss.Jaccard])
  }}
class JaccardScore  {
  val mjc = new Jaccard()  with Serializable  val conf = new 
SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch)  
conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)  
conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator)
  val sc = new SparkContext(conf)
  def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])  {  
val jc_ = this.mjc
  var i: Int = 0  for (sentence - sourcerdd.toLocalIterator)   {val str1 = 
new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = 
jc_.score(str1, new BasicStringWrapper(x)))val fileName = new 
String(/apps/software/scala-approsstrmatch-sentence + i)
scorevector.saveAsTextFile(fileName)i += 1   }
  }
Here is the script: val distFile = 
sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = 
sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new 
approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) 
O/P:
14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at 
textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO 
DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile 
at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 
with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 
on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO 
TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 
INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching 
http://serverip:47417/jars/approxstrmatch.jar with timestamp 
140372470156414/06/25 12:32:05 INFO Utils: Fetching 
http://serverip:47417/jars/approxstrmatch.jar to 
/tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: 
Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar 
to class loader14/06/25 12:32:05 INFO Executor: Fetching 
http://serverip:47417/jars/secondstring-20140618.jar with timestamp 
140372470156214/06/25 12:32:05 INFO Utils: Fetching 
http://serverip:47417/jars/secondstring-20140618.jar to 
/tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: 
Adding 
file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar 
to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 
locally14/06/25 12:32:06 INFO HadoopRDD: Input split: 
hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: 
Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending 
result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task 
ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on 
localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed 
ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: 
Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO 
SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 
s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 
114/06/25 12:32

Serialization of objects

2014-06-26 Thread Sameer Tilak
Hi everyone,
Aaron, thanks for your help so far. I am trying to serialize objects that I 
instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, 
and com.wcohen.ss.BasicStringWrapper. However, I am having problems with 
serialization. I am (at least trying to) using Kryo for serialization. I  am 
still facing the serialization issue. I get org.apache.spark.SparkException: 
Job aborted due to stage failure: Task not serializable: 
java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help 
with this will be great.  Scala code:
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;
import java.util.Iterator;
import org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._import org.apache.spark.SparkConf
import org.apache.spark.rdd;import org.apache.spark.rdd.RDD;
import com.esotericsoftware.kryo.Kryoimport 
org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {  override def 
registerClasses(kryo: Kryo) {
kryo.register(classOf[approxstrmatch.JaccardScore])
kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
kryo.register(classOf[com.wcohen.ss.Jaccard])
  }}
class JaccardScore  {
  val mjc = new Jaccard()  with Serializable  val conf = new 
SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch)  
conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)  
conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator)
  val sc = new SparkContext(conf)
  def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])  {  
val jc_ = this.mjc
  var i: Int = 0  for (sentence - sourcerdd.toLocalIterator)   {val str1 = 
new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = 
jc_.score(str1, new BasicStringWrapper(x)))val fileName = new 
String(/apps/software/scala-approsstrmatch-sentence + i)
scorevector.saveAsTextFile(fileName)i += 1   }
  }
Here is the script: val distFile = 
sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = 
sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new 
approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) 
O/P:
14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at 
textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO 
DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile 
at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 
with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 
on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO 
TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 
INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching 
http://serverip:47417/jars/approxstrmatch.jar with timestamp 
140372470156414/06/25 12:32:05 INFO Utils: Fetching 
http://serverip:47417/jars/approxstrmatch.jar to 
/tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: 
Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar 
to class loader14/06/25 12:32:05 INFO Executor: Fetching 
http://serverip:47417/jars/secondstring-20140618.jar with timestamp 
140372470156214/06/25 12:32:05 INFO Utils: Fetching 
http://serverip:47417/jars/secondstring-20140618.jar to 
/tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: 
Adding 
file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar 
to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 
locally14/06/25 12:32:06 INFO HadoopRDD: Input split: 
hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: 
Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending 
result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task 
ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on 
localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed 
ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: 
Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO 
SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 
s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 
114/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at 
JaccardScore.scala:5214/06/25 12:32:06 INFO DAGScheduler: Got job 1 
(saveAsTextFile at JaccardScore.scala:52) with 2 output partitions 
(allowLocal=false)14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 
1(saveAsTextFile at JaccardScore.scala:52)14/06/25 12:32:06 INFO DAGScheduler: 
Parents of final stage: List()14/06/25 12:32:06 INFO DAGScheduler: Missing 
parents: List()14/06/25 12:32:06 INFO DAGScheduler: