Hi All,
I am trying to move away from spark-shell to spark-submit and have been making
some code changes. However, I am now having problem with serialization. It
used to work fine before the code update. Not sure what I did wrong. However,
here is the code....
JaccardScore.scala
package approxstrmatch
class JaccardScore {
val mjc = new Jaccard() with Serializable
def main(args: Array[String]) {
val conf = new
SparkConf().setAppName("ApproxStrMatch").set("spark.storage.memoryFraction",
"0.0") conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "approxstrmatch.MyRegistrator")
val sc = new SparkContext(conf) // More code here…
score.calculateSortedJaccardScore(srcFile, distFile) sc.stop()
}
def calculateSortedJaccardScore (sourcerdd: RDD[String], destrdd: RDD[String])
{ // Code over here…}
MyRegistrator.scala: This is the central place for registering all the classes.
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;import
com.wcohen.ss.Level2MongeElkan;import com.wcohen.ss.Levenstein;import
com.wcohen.ss.ScaledLevenstein;import com.wcohen.ss.Jaro;import
com.wcohen.ss.JensenShannonDistance;
import com.esotericsoftware.kryo.Kryo// import
org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
class MyRegistrator extends KryoRegistrator { override def
registerClasses(kryo: Kryo) {
kryo.register(classOf[approxstrmatch.JaccardScore])
kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
kryo.register(classOf[com.wcohen.ss.Jaccard]) // Bunch of other registrations
here.
}}
I run it as:
spark-submit --class approxstrmatch.JaccardDriver --master local
--executor-memory 8G
/apps/sameert/software/approxstrmatch/target/scala-2.10/classes/approxstrmatch_2.10-1.0.jar
I get the following error message:
java.lang.IllegalArgumentException: Unable to create serializer
"com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
approxstrmatch.JaccardScoreat
com.esotericsoftware.kryo.Kryo.newSerializer(Kryo.java:335)at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:314)at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:49)at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:307)at
com.esotericsoftware.kryo.Kryo.register(Kryo.java:351)at
approxstrmatch.MyRegistrator.registerClasses(MyRegistrator.scala:18)at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:77)at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:73)at
scala.Option.foreach(Option.scala:236)at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:73)at
org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:130)at
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:92)at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:995)at
org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:80)at
org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:66)at
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:847)at
org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:205)at
org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)at
org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661)at
org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)at
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812)at
org.apache.spark.broadcast.HttpBroadcast.<init>(HttpBroadcast.scala:52)at
org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)