Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Marcin Kuthan
I'm glad that I could help :) 19 sie 2015 8:52 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): +1 I wish I have read this blog earlier. I am using Java and have just implemented a singleton producer per executor/JVM during the day. Yes, I did see that NonSerializableException when

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Shenghua(Daniel) Wan
+1 I wish I have read this blog earlier. I am using Java and have just implemented a singleton producer per executor/JVM during the day. Yes, I did see that NonSerializableException when I was debugging the code ... Thanks for sharing. On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Marcin Kuthan
As long as Kafka producent is thread-safe you don't need any pool at all. Just share single producer on every executor. Please look at my blog post for more details. http://allegro.tech/spark-kafka-integration.html 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): All of

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Its a cool blog post! Tweeted it! Broadcasting the configuration necessary for lazily instantiating the producer is a good idea. Nitpick: The first code example has an extra `}` ;) On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com wrote: As long as Kafka producent is

broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties);

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Cody Koeninger
I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I