Liz, Could you paste your code for calling the producer send call here? Just realized in 0.7 there might be some calling pattern corner cases that cause concurrent access of the serializer.
Also, I would recommend you to try out the new version of Kafka (0.8.x), in which each producer will only have one back ground thread for sending data, guaranteeing thread safety. Guozhang On Tue, Jan 13, 2015 at 11:40 AM, Elizabeth Bennett <ebenn...@loggly.com> wrote: > Hi Guozhang, > Thanks for you response. We've only got one producer client (per Kryo > instance) but the producer client is configured (via the broker.list > config) to produce to two Kafka brokers. When we create the Producer, we > pass in an instance of the serializer. What if we used the serializer.class > config to specify the class name of the serializer rather than pass in an > instance? Would Kafka then create a separate serializer instance for each > broker that it produces to? That would solve our problem assuming that the > Producer spawns new threads for each kafka broker that it produces to, > which I'm not sure about. > > --Liz > > On Mon, Jan 12, 2015 at 10:55 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Hi Liz, > > > > Do you have multiple producer clients that use the same Kryo serializer > > objects? Each client will only have one background thread that tries to > > call serialize(), and hence in that case you will have concurrent access. > > > > Guozhang > > > > > > On Mon, Jan 12, 2015 at 5:32 PM, Elizabeth Bennett <ebenn...@loggly.com> > > wrote: > > > > > Hi Kafka Users, > > > I have written my own implementation of the kafka Encoder class for > > > serializing objects to Messages. It uses Kryo, which is a non-thread > safe > > > java serialization library. I'm using Kafka 0.7.2. > > > > > > We recently ran into an issue where we increased the number of kafka > > > brokers for our kafka producer from 1 to 2. When we did this, we ran > into > > > exceptions that seemed related to Kryo being used concurrently by > > multiple > > > threads. So, my question is, do I need to modify my Encoder class to be > > > thread safe? I dug through the Kafka documentation and couldn't find > > > anything that said one way or another. Any information would be great. > > > Thank you! > > > > > > --Liz Bennett > > > > > > p.s. for what it's worth here is a stack trace from one of the > exceptions > > > we saw: > > > > > > 2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle] Failed > > > to write 9 batched events to Kafka. > > > com.esotericsoftware.kryo.KryoException: > > > java.lang.ArrayIndexOutOfBoundsException: 40 > > > Serialization trace: > > > fieldGroups (com.loggly.core.event.Event) > > > event (com.loggly.core.event.FailedEvent) > > > at > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > > > at > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538) > > > at > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:520) > > > at > > > > > > com.loggly.eventreader.kryo.KryoEventSerDes.serialize(KryoEventSerDes.java:39) > > > at > > > > > > com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:23) > > > at > > > > > > com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:8) > > > at > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74) > > > at > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74) > > > at > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) > > > at > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) > > > at > > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > > > at scala.collection.immutable.List.foreach(List.scala:45) > > > at > > > > > > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30) > > > at > > scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42) > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:194) > > > at scala.collection.mutable.ListBuffer.map(ListBuffer.scala:42) > > > at > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74) > > > at > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74) > > > at > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) > > > at > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) > > > at > > > > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) > > > at > > > > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660) > > > at > > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) > > > at > > > > > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) > > > at > > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43) > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:93) > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:194) > > > at scala.collection.mutable.HashMap.map(HashMap.scala:43) > > > at > > > > > > kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:74) > > > at > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) > > > at > > > > > > com.loggly.kafka.producer.ProducerHandlerWrapper.handle(ProducerHandlerWrapper.java:64) > > > at > > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) > > > at > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) > > > at > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) > > > at scala.collection.immutable.Stream.foreach(Stream.scala:291) > > > at > > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) > > > at > > > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) > > > Caused by: java.lang.ArrayIndexOutOfBoundsException: 40 > > > at > > > com.esotericsoftware.kryo.util.ObjectMap.resize(ObjectMap.java:460) > > > at > > > > com.esotericsoftware.kryo.util.ObjectMap.put_internal(ObjectMap.java:125) > > > at > > com.esotericsoftware.kryo.util.ObjectMap.put(ObjectMap.java:73) > > > at > > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.register(DefaultClassResolver.java:49) > > > at > > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) > > > at > com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:476) > > > at > > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > > > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:503) > > > at > > > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:608) > > > at > > > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:91) > > > at > > > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538) > > > at > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > ... 40 more > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang