Hello all, I am using Spark 1.0.2 and I have a custom receiver that works well.
I tried adding Kryo serialization to SparkConf: val spark = new SparkConf() ….. .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") and I am getting a strange error that I am not sure how to solve: Exception in thread "Thread-37" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put Serialization trace: object (com.typesafe.config.impl.SimpleConfig) atomFeedConf (com.twc.needle.cp.AtomFeedReceiver) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) …… Here is part of my custom receiver: class AtomFeedReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY_SER) { private val conf = ConfigFactory.load private val atomFeedConf = conf.getConfig("cp.spark.atomfeed") private val atomFeedUrl = atomFeedConf.getString("url") private val urlConnTimeout = atomFeedConf.getInt("url_conn_timeout") private val urlReadTimeout = atomFeedConf.getInt("url_read_timeout") private val username = atomFeedConf.getString("username") private val password = atomFeedConf.getString("password") private val keepFeedCriteria = atomFeedConf.getString("keep_feed_criteria") private val feedTrackerDir = atomFeedConf.getString("feed_tracker_dir") private val feedTrackerFileName = atomFeedConf.getString("feed_tracker_file_name") private val enableSampling = atomFeedConf.getBoolean("enable_sampling”) ….. Here is how I am calling the receiver in the main method: val logLineStreamRaw = ssc.receiverStream(new AtomFeedReceiver) Any idea why Spark needs the Config object to be mutable only when Kryo serialization is enabled? Thanks. ________________________________ This E-mail and any of its attachments may contain Time Warner Cable proprietary information, which is privileged, confidential, or subject to copyright belonging to Time Warner Cable. This E-mail is intended solely for the use of the individual or entity to which it is addressed. If you are not the intended recipient of this E-mail, you are hereby notified that any dissemination, distribution, copying, or action taken in relation to the contents of and attachments to this E-mail is strictly prohibited and may be unlawful. If you have received this E-mail in error, please notify the sender immediately and permanently delete the original and any copy of this E-mail and any printout.