Hello all,

I am using Spark 1.0.2 and I have a custom receiver that works well.

I tried adding Kryo serialization to SparkConf:

val spark = new SparkConf()
    …..
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

 and I am getting a strange error that I am not sure how to solve:

Exception in thread "Thread-37" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception 
failure in TID 0 on host localhost: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't 
call Map.put
Serialization trace:
object (com.typesafe.config.impl.SimpleConfig)
atomFeedConf (com.twc.needle.cp.AtomFeedReceiver)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
        
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

        ……

Here is part of my custom receiver:

class AtomFeedReceiver
    extends Receiver[String](StorageLevel.MEMORY_ONLY_SER) {

  private val conf = ConfigFactory.load
  private val atomFeedConf = conf.getConfig("cp.spark.atomfeed")

  private val atomFeedUrl = atomFeedConf.getString("url")
  private val urlConnTimeout = atomFeedConf.getInt("url_conn_timeout")
  private val urlReadTimeout = atomFeedConf.getInt("url_read_timeout")
  private val username = atomFeedConf.getString("username")
  private val password = atomFeedConf.getString("password")
  private val keepFeedCriteria = atomFeedConf.getString("keep_feed_criteria")
  private val feedTrackerDir = atomFeedConf.getString("feed_tracker_dir")
  private val feedTrackerFileName = 
atomFeedConf.getString("feed_tracker_file_name")
  private val enableSampling = atomFeedConf.getBoolean("enable_sampling”)

  …..


Here is how I am calling the receiver in the main method:

val logLineStreamRaw = ssc.receiverStream(new AtomFeedReceiver)

Any idea why Spark needs the Config object to be mutable only when Kryo 
serialization is enabled?

Thanks.


________________________________
This E-mail and any of its attachments may contain Time Warner Cable 
proprietary information, which is privileged, confidential, or subject to 
copyright belonging to Time Warner Cable. This E-mail is intended solely for 
the use of the individual or entity to which it is addressed. If you are not 
the intended recipient of this E-mail, you are hereby notified that any 
dissemination, distribution, copying, or action taken in relation to the 
contents of and attachments to this E-mail is strictly prohibited and may be 
unlawful. If you have received this E-mail in error, please notify the sender 
immediately and permanently delete the original and any copy of this E-mail and 
any printout.

Reply via email to