I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread
On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <wansheng...@gmail.com > wrote: > Hi, > Did anyone see java.util.ConcurrentModificationException when using > broadcast variables? > I encountered this exception when wrapping a Kafka producer like this in > the spark streaming driver. > > Here is what I did. > KafkaProducer<String, String> producer = new KafkaProducer<String, > String>(properties); > final Broadcast<KafkaDataProducer> bCastProducer > = streamingContext.sparkContext().broadcast(producer); > > Then within an closure called by a foreachRDD, I was trying to get the > wrapped producer, i.e. > KafkaProducer<String, String> p = bCastProducer.value(); > > after rebuilding and rerunning, I got the stack trace like this > > Exception in thread "main" com.esotericsoftware.kryo.KryoException: > java.util.ConcurrentModificationException > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > classloader (java.security.ProtectionDomain) > context (java.security.AccessControlContext) > acc (org.apache.spark.util.MutableURLClassLoader) > contextClassLoader (org.apache.kafka.common.utils.KafkaThread) > ioThread (org.apache.kafka.clients.producer.KafkaProducer) > producer ("my driver") > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) > at > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) > at > org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) > at > org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) > at "my driver" > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.util.ConcurrentModificationException > at java.util.Vector$Itr.checkForComodification(Vector.java:1156) > at java.util.Vector$Itr.next(Vector.java:1133) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > ... 41 more > > Thanks. > > -- > > Regards, > Shenghua (Daniel) Wan >