I'm glad that I could help :) 19 sie 2015 8:52 AM "Shenghua(Daniel) Wan" <wansheng...@gmail.com> napisał(a):
> +1 > > I wish I have read this blog earlier. I am using Java and have just > implemented a singleton producer per executor/JVM during the day. > Yes, I did see that NonSerializableException when I was debugging the code > ... > > Thanks for sharing. > > On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Its a cool blog post! Tweeted it! >> Broadcasting the configuration necessary for lazily instantiating the >> producer is a good idea. >> >> Nitpick: The first code example has an extra `}` ;) >> >> On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan <marcin.kut...@gmail.com> >> wrote: >> >>> As long as Kafka producent is thread-safe you don't need any pool at >>> all. Just share single producer on every executor. Please look at my blog >>> post for more details. http://allegro.tech/spark-kafka-integration.html >>> 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wansheng...@gmail.com> >>> napisał(a): >>> >>>> All of you are right. >>>> >>>> I was trying to create too many producers. My idea was to create a >>>> pool(for now the pool contains only one producer) shared by all the >>>> executors. >>>> After I realized it was related to the serializable issues (though I >>>> did not find clear clues in the source code to indicate the broacast >>>> template type parameter must be implement serializable), I followed spark >>>> cassandra connector design and created a singleton of Kafka producer pools. >>>> There is not exception noticed. >>>> >>>> Thanks for all your comments. >>>> >>>> >>>> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> Why are you even trying to broadcast a producer? A broadcast variable >>>>> is some immutable piece of serializable DATA that can be used for >>>>> processing on the executors. A Kafka producer is neither DATA nor >>>>> immutable, and definitely not serializable. >>>>> The right way to do this is to create the producer in the executors. >>>>> Please see the discussion in the programming guide >>>>> >>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams >>>>> >>>>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> I wouldn't expect a kafka producer to be serializable at all... among >>>>>> other things, it has a background thread >>>>>> >>>>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan < >>>>>> wansheng...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> Did anyone see java.util.ConcurrentModificationException when using >>>>>>> broadcast variables? >>>>>>> I encountered this exception when wrapping a Kafka producer like >>>>>>> this in the spark streaming driver. >>>>>>> >>>>>>> Here is what I did. >>>>>>> KafkaProducer<String, String> producer = new KafkaProducer<String, >>>>>>> String>(properties); >>>>>>> final Broadcast<KafkaDataProducer> bCastProducer >>>>>>> = streamingContext.sparkContext().broadcast(producer); >>>>>>> >>>>>>> Then within an closure called by a foreachRDD, I was trying to get >>>>>>> the wrapped producer, i.e. >>>>>>> KafkaProducer<String, String> p = bCastProducer.value(); >>>>>>> >>>>>>> after rebuilding and rerunning, I got the stack trace like this >>>>>>> >>>>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException: >>>>>>> java.util.ConcurrentModificationException >>>>>>> Serialization trace: >>>>>>> classes (sun.misc.Launcher$AppClassLoader) >>>>>>> classloader (java.security.ProtectionDomain) >>>>>>> context (java.security.AccessControlContext) >>>>>>> acc (org.apache.spark.util.MutableURLClassLoader) >>>>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread) >>>>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer) >>>>>>> producer ("my driver") >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>>>>>> at >>>>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) >>>>>>> at >>>>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) >>>>>>> at >>>>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) >>>>>>> at >>>>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) >>>>>>> at >>>>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>>>>>> at >>>>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>>>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) >>>>>>> at >>>>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) >>>>>>> at "my driver" >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> at >>>>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) >>>>>>> at >>>>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) >>>>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) >>>>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) >>>>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>>>>>> Caused by: java.util.ConcurrentModificationException >>>>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156) >>>>>>> at java.util.Vector$Itr.next(Vector.java:1133) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) >>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>>>>>> ... 41 more >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Regards, >>>>>>> Shenghua (Daniel) Wan >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> Regards, >>>> Shenghua (Daniel) Wan >>>> >>> >> > > > -- > > Regards, > Shenghua (Daniel) Wan >