Hi Liz,

Do you have multiple producer clients that use the same Kryo serializer
objects? Each client will only have one background thread that tries to
call serialize(), and hence in that case you will have concurrent access.

Guozhang


On Mon, Jan 12, 2015 at 5:32 PM, Elizabeth Bennett <ebenn...@loggly.com>
wrote:

> Hi Kafka Users,
> I have written my own implementation of the kafka Encoder class for
> serializing objects to Messages. It uses Kryo, which is a non-thread safe
> java serialization library. I'm using Kafka 0.7.2.
>
> We recently ran into an issue where we increased the number of kafka
> brokers for our kafka producer from 1 to 2. When we did this, we ran into
> exceptions that seemed related to Kryo being used concurrently by multiple
> threads. So, my question is, do I need to modify my Encoder class to be
> thread safe? I dug through the Kafka documentation and couldn't find
> anything that said one way or another. Any information would be great.
> Thank you!
>
> --Liz Bennett
>
> p.s. for what it's worth here is a stack trace from one of the exceptions
> we saw:
>
> 2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle] Failed
> to write 9 batched events to Kafka.
> com.esotericsoftware.kryo.KryoException:
> java.lang.ArrayIndexOutOfBoundsException: 40
> Serialization trace:
> fieldGroups (com.loggly.core.event.Event)
> event (com.loggly.core.event.FailedEvent)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:520)
>         at
> com.loggly.eventreader.kryo.KryoEventSerDes.serialize(KryoEventSerDes.java:39)
>         at
> com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:23)
>         at
> com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:8)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30)
>         at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
>         at scala.collection.mutable.ListBuffer.map(ListBuffer.scala:42)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:660)
>         at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:93)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
>         at scala.collection.mutable.HashMap.map(HashMap.scala:43)
>         at
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:74)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>         at
> com.loggly.kafka.producer.ProducerHandlerWrapper.handle(ProducerHandlerWrapper.java:64)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:291)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
>         at
> com.esotericsoftware.kryo.util.ObjectMap.resize(ObjectMap.java:460)
>         at
> com.esotericsoftware.kryo.util.ObjectMap.put_internal(ObjectMap.java:125)
>         at com.esotericsoftware.kryo.util.ObjectMap.put(ObjectMap.java:73)
>         at
> com.esotericsoftware.kryo.util.DefaultClassResolver.register(DefaultClassResolver.java:49)
>         at
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:476)
>         at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>         at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:503)
>         at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:608)
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:91)
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>         ... 40 more
>



-- 
-- Guozhang

Reply via email to