I wouldn't expect a kafka producer to be serializable at all... among other
things, it has a background thread

On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <wansheng...@gmail.com
> wrote:

> Hi,
> Did anyone see java.util.ConcurrentModificationException when using
> broadcast variables?
> I encountered this exception when wrapping a Kafka producer like this in
> the spark streaming driver.
>
> Here is what I did.
> KafkaProducer<String, String> producer = new KafkaProducer<String,
> String>(properties);
> final Broadcast<KafkaDataProducer> bCastProducer
>     = streamingContext.sparkContext().broadcast(producer);
>
> Then within an closure called by a foreachRDD, I was trying to get the
> wrapped producer, i.e.
>  KafkaProducer<String, String> p = bCastProducer.value();
>
> after rebuilding and rerunning, I got the stack trace like this
>
> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> context (java.security.AccessControlContext)
> acc (org.apache.spark.util.MutableURLClassLoader)
> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
> producer ("my driver")
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
> at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
> at
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
> at
> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
> at "my driver"
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.util.ConcurrentModificationException
> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
> at java.util.Vector$Itr.next(Vector.java:1133)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ... 41 more
>
> ​Thanks.​
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>

Reply via email to