Its a cool blog post! Tweeted it! Broadcasting the configuration necessary for lazily instantiating the producer is a good idea.
Nitpick: The first code example has an extra `}` ;) On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan <marcin.kut...@gmail.com> wrote: > As long as Kafka producent is thread-safe you don't need any pool at all. > Just share single producer on every executor. Please look at my blog post > for more details. http://allegro.tech/spark-kafka-integration.html > 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wansheng...@gmail.com> > napisał(a): > >> All of you are right. >> >> I was trying to create too many producers. My idea was to create a >> pool(for now the pool contains only one producer) shared by all the >> executors. >> After I realized it was related to the serializable issues (though I did >> not find clear clues in the source code to indicate the broacast template >> type parameter must be implement serializable), I followed spark cassandra >> connector design and created a singleton of Kafka producer pools. There is >> not exception noticed. >> >> Thanks for all your comments. >> >> >> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Why are you even trying to broadcast a producer? A broadcast variable is >>> some immutable piece of serializable DATA that can be used for processing >>> on the executors. A Kafka producer is neither DATA nor immutable, and >>> definitely not serializable. >>> The right way to do this is to create the producer in the executors. >>> Please see the discussion in the programming guide >>> >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams >>> >>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> I wouldn't expect a kafka producer to be serializable at all... among >>>> other things, it has a background thread >>>> >>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan < >>>> wansheng...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> Did anyone see java.util.ConcurrentModificationException when using >>>>> broadcast variables? >>>>> I encountered this exception when wrapping a Kafka producer like this >>>>> in the spark streaming driver. >>>>> >>>>> Here is what I did. >>>>> KafkaProducer<String, String> producer = new KafkaProducer<String, >>>>> String>(properties); >>>>> final Broadcast<KafkaDataProducer> bCastProducer >>>>> = streamingContext.sparkContext().broadcast(producer); >>>>> >>>>> Then within an closure called by a foreachRDD, I was trying to get the >>>>> wrapped producer, i.e. >>>>> KafkaProducer<String, String> p = bCastProducer.value(); >>>>> >>>>> after rebuilding and rerunning, I got the stack trace like this >>>>> >>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException: >>>>> java.util.ConcurrentModificationException >>>>> Serialization trace: >>>>> classes (sun.misc.Launcher$AppClassLoader) >>>>> classloader (java.security.ProtectionDomain) >>>>> context (java.security.AccessControlContext) >>>>> acc (org.apache.spark.util.MutableURLClassLoader) >>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread) >>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer) >>>>> producer ("my driver") >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>>>> at >>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) >>>>> at >>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) >>>>> at >>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) >>>>> at >>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) >>>>> at >>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>>>> at >>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) >>>>> at >>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) >>>>> at "my driver" >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at >>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) >>>>> at >>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) >>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) >>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) >>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>>>> Caused by: java.util.ConcurrentModificationException >>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156) >>>>> at java.util.Vector$Itr.next(Vector.java:1133) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) >>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>> ... 41 more >>>>> >>>>> Thanks. >>>>> >>>>> -- >>>>> >>>>> Regards, >>>>> Shenghua (Daniel) Wan >>>>> >>>> >>>> >>> >> >> >> -- >> >> Regards, >> Shenghua (Daniel) Wan >> >