ConcurrentModificationException

2018-05-02 Thread ccherng
I have encountered the below exception running Spark 2.1.0 on emr. The
exception is the same as reported in

Serialization of accumulators in heartbeats is not thread-safe
https://issues.apache.org/jira/browse/SPARK-17463

Pull requests were made and merged and that issue was marked as resolved but
someone named Sunil in the comments said they still were encountering the
problem with Spark 2.0.2 on emr. I am too. Should this issue be reopened?


18/04/30 22:54:15 WARN NettyRpcEndpointRef: Error sending message [message =
Heartbeat(4229,[Lscala.Tuple2;@5e8fe6a5,BlockManagerId(4229,
ip-172-23-229-187.ec2.internal, 35905, None))] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-19 Thread HARSH TAKKAR
Thanks Cody,

It worked for me buy keeping num executor with each having 1 core = num of
partitions of kafka.



On Mon, Sep 18, 2017 at 8:47 PM Cody Koeninger  wrote:

> Have you searched in jira, e.g.
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR 
> wrote:
> > Hi
> >
> > Changing spark version if my last resort, is there any other workaround
> for
> > this problem.
> >
> >
> > On Mon, Sep 18, 2017 at 11:43 AM pandees waran 
> wrote:
> >>
> >> All, May I know what exactly changed in 2.1.1 which solved this problem?
> >>
> >> Sent from my iPhone
> >>
> >> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
> >> wrote:
> >>
> >> Hi,
> >>
> >> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
> >> solved my issue. Can you try with 2.1.1 as well and report back?
> >>
> >> Best,
> >> Anastasios
> >>
> >> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
> >>
> >>
> >> Hi
> >>
> >> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
> >> partitions of each rdd in a dStream formed using KafkaUtils, i am
> getting
> >> the below exception, please suggest a fix.
> >>
> >> I have following config
> >>
> >> kafka :
> >> enable.auto.commit:"true",
> >> auto.commit.interval.ms:"1000",
> >> session.timeout.ms:"3",
> >>
> >> Spark:
> >>
> >> spark.streaming.backpressure.enabled=true
> >>
> >> spark.streaming.kafka.maxRatePerPartition=200
> >>
> >>
> >> Exception in task 0.2 in stage 3236.0 (TID 77795)
> >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access
> >>
> >> --
> >> Kind Regards
> >> Harsh
> >>
> >>
> >
>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Cody Koeninger
Have you searched in jira, e.g.

https://issues.apache.org/jira/browse/SPARK-19185

On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR  wrote:
> Hi
>
> Changing spark version if my last resort, is there any other workaround for
> this problem.
>
>
> On Mon, Sep 18, 2017 at 11:43 AM pandees waran  wrote:
>>
>> All, May I know what exactly changed in 2.1.1 which solved this problem?
>>
>> Sent from my iPhone
>>
>> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
>> wrote:
>>
>> Hi,
>>
>> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
>> solved my issue. Can you try with 2.1.1 as well and report back?
>>
>> Best,
>> Anastasios
>>
>> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
>>
>>
>> Hi
>>
>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>> the below exception, please suggest a fix.
>>
>> I have following config
>>
>> kafka :
>> enable.auto.commit:"true",
>> auto.commit.interval.ms:"1000",
>> session.timeout.ms:"3",
>>
>> Spark:
>>
>> spark.streaming.backpressure.enabled=true
>>
>> spark.streaming.kafka.maxRatePerPartition=200
>>
>>
>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>>
>> --
>> Kind Regards
>> Harsh
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread HARSH TAKKAR
Hi

Changing spark version if my last resort, is there any other workaround for
this problem.


On Mon, Sep 18, 2017 at 11:43 AM pandees waran  wrote:

> All, May I know what exactly changed in 2.1.1 which solved this problem?
>
> Sent from my iPhone
>
> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
> wrote:
>
> Hi,
>
> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
> solved my issue. Can you try with 2.1.1 as well and report back?
>
> Best,
> Anastasios
>
> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
>
>
> Hi
>
> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
> the below exception, please suggest a fix.
>
> I have following config
>
> kafka :
> enable.auto.commit:"true",
> auto.commit.interval.ms:"1000",
> session.timeout.ms:"3",
>
> Spark:
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.kafka.maxRatePerPartition=200
>
>
> Exception in task 0.2 in stage 3236.0 (TID 77795)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
>
> --
> Kind Regards
> Harsh
>
>
>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread pandees waran
All, May I know what exactly changed in 2.1.1 which solved this problem?

Sent from my iPhone

> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias  wrote:
> 
> Hi,
> 
> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 
> solved my issue. Can you try with 2.1.1 as well and report back?
> 
> Best,
> Anastasios
> 
> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
> 
> Hi 
> 
> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the 
> partitions of each rdd in a dStream formed using KafkaUtils, i am getting the 
> below exception, please suggest a fix.
> 
> I have following config 
> 
> kafka : 
> enable.auto.commit:"true",
> auto.commit.interval.ms:"1000",
> session.timeout.ms:"3",
> 
> Spark: 
> spark.streaming.backpressure.enabled=true
> 
> spark.streaming.kafka.maxRatePerPartition=200
> 
> 
> 
> Exception in task 0.2 in stage 3236.0 (TID 77795)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> 
> --
> Kind Regards
> Harsh 
> 


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Anastasios Zouzias
Hi,

I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
solved my issue. Can you try with 2.1.1 as well and report back?

Best,
Anastasios

Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :


Hi

I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
partitions of each rdd in a dStream formed using KafkaUtils, i am getting
the below exception, please suggest a fix.

I have following config

kafka :
enable.auto.commit:"true",
auto.commit.interval.ms:"1000",
session.timeout.ms:"3",

Spark:

spark.streaming.backpressure.enabled=true

spark.streaming.kafka.maxRatePerPartition=200


Exception in task 0.2 in stage 3236.0 (TID 77795)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access

--
Kind Regards
Harsh


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread kant kodali
You should paste some code. ConcurrentModificationException normally
happens when you modify a list or any non-thread safe data structure while
you are iterating over it.

On Sun, Sep 17, 2017 at 10:25 PM, HARSH TAKKAR <takkarha...@gmail.com>
wrote:

> Hi,
>
> No we are not creating any thread for kafka DStream
> however, we have a single thread for refreshing a resource cache on
> driver, but that is totally separate to this connection.
>
> On Mon, Sep 18, 2017 at 12:29 AM kant kodali <kanth...@gmail.com> wrote:
>
>> Are you creating threads in your application?
>>
>> On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR <takkarha...@gmail.com>
>> wrote:
>>
>>>
>>> Hi
>>>
>>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>>> the below exception, please suggest a fix.
>>>
>>> I have following config
>>>
>>> kafka :
>>> enable.auto.commit:"true",
>>> auto.commit.interval.ms:"1000",
>>> session.timeout.ms:"3",
>>>
>>> Spark:
>>>
>>> spark.streaming.backpressure.enabled=true
>>>
>>> spark.streaming.kafka.maxRatePerPartition=200
>>>
>>>
>>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>> for multi-threaded access
>>>
>>> --
>>> Kind Regards
>>> Harsh
>>>
>>
>>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi,

No we are not creating any thread for kafka DStream
however, we have a single thread for refreshing a resource cache on driver,
but that is totally separate to this connection.

On Mon, Sep 18, 2017 at 12:29 AM kant kodali  wrote:

> Are you creating threads in your application?
>
> On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR 
> wrote:
>
>>
>> Hi
>>
>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>> the below exception, please suggest a fix.
>>
>> I have following config
>>
>> kafka :
>> enable.auto.commit:"true",
>> auto.commit.interval.ms:"1000",
>> session.timeout.ms:"3",
>>
>> Spark:
>>
>> spark.streaming.backpressure.enabled=true
>>
>> spark.streaming.kafka.maxRatePerPartition=200
>>
>>
>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>>
>> --
>> Kind Regards
>> Harsh
>>
>
>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread kant kodali
Are you creating threads in your application?

On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR  wrote:

>
> Hi
>
> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
> the below exception, please suggest a fix.
>
> I have following config
>
> kafka :
> enable.auto.commit:"true",
> auto.commit.interval.ms:"1000",
> session.timeout.ms:"3",
>
> Spark:
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.kafka.maxRatePerPartition=200
>
>
> Exception in task 0.2 in stage 3236.0 (TID 77795)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
>
> --
> Kind Regards
> Harsh
>


ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi

I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
partitions of each rdd in a dStream formed using KafkaUtils, i am getting
the below exception, please suggest a fix.

I have following config

kafka :
enable.auto.commit:"true",
auto.commit.interval.ms:"1000",
session.timeout.ms:"3",

Spark:

spark.streaming.backpressure.enabled=true

spark.streaming.kafka.maxRatePerPartition=200


Exception in task 0.2 in stage 3236.0 (TID 77795)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access

--
Kind Regards
Harsh


Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Marcin Kuthan
I'm glad that I could help :)
19 sie 2015 8:52 AM Shenghua(Daniel) Wan wansheng...@gmail.com
napisał(a):

 +1

 I wish I have read this blog earlier. I am using Java and have just
 implemented a singleton producer per executor/JVM during the day.
 Yes, I did see that NonSerializableException when I was debugging the code
 ...

 Thanks for sharing.

 On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com
 wrote:

 Its a cool blog post! Tweeted it!
 Broadcasting the configuration necessary for lazily instantiating the
 producer is a good idea.

 Nitpick: The first code example has an extra `}` ;)

 On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
 wrote:

 As long as Kafka producent is thread-safe you don't need any pool at
 all. Just share single producer on every executor. Please look at my blog
 post for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I
 did not find clear clues in the source code to indicate the broacast
 template type parameter must be implement serializable),  I followed spark
 cassandra connector design and created a singleton of Kafka producer pools.
 There is not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable
 is some immutable piece of serializable DATA that can be used for
 processing on the executors. A Kafka producer is neither DATA nor
 immutable, and definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like
 this in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get
 the wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Shenghua(Daniel) Wan
+1

I wish I have read this blog earlier. I am using Java and have just
implemented a singleton producer per executor/JVM during the day.
Yes, I did see that NonSerializableException when I was debugging the code
...

Thanks for sharing.

On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com wrote:

 Its a cool blog post! Tweeted it!
 Broadcasting the configuration necessary for lazily instantiating the
 producer is a good idea.

 Nitpick: The first code example has an extra `}` ;)

 On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
 wrote:

 As long as Kafka producent is thread-safe you don't need any pool at all.
 Just share single producer on every executor. Please look at my blog post
 for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable
 is some immutable piece of serializable DATA that can be used for
 processing on the executors. A Kafka producer is neither DATA nor
 immutable, and definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get
 the wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
All of you are right.

I was trying to create too many producers. My idea was to create a pool(for
now the pool contains only one producer) shared by all the executors.
After I realized it was related to the serializable issues (though I did
not find clear clues in the source code to indicate the broacast template
type parameter must be implement serializable),  I followed spark cassandra
connector design and created a singleton of Kafka producer pools. There is
not exception noticed.

Thanks for all your comments.


On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at
 

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Why are you even trying to broadcast a producer? A broadcast variable is
some immutable piece of serializable DATA that can be used for processing
on the executors. A Kafka producer is neither DATA nor immutable, and
definitely not serializable.
The right way to do this is to create the producer in the executors. Please
see the discussion in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
 at my driver
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Marcin Kuthan
As long as Kafka producent is thread-safe you don't need any pool at all.
Just share single producer on every executor. Please look at my blog post
for more details. http://allegro.tech/spark-kafka-integration.html
19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Its a cool blog post! Tweeted it!
Broadcasting the configuration necessary for lazily instantiating the
producer is a good idea.

Nitpick: The first code example has an extra `}` ;)

On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
wrote:

 As long as Kafka producent is thread-safe you don't need any pool at all.
 Just share single producer on every executor. Please look at my blog post
 for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
Hi,
Did anyone see java.util.ConcurrentModificationException when using
broadcast variables?
I encountered this exception when wrapping a Kafka producer like this in
the spark streaming driver.

Here is what I did.
KafkaProducerString, String producer = new KafkaProducerString,
String(properties);
final BroadcastKafkaDataProducer bCastProducer
= streamingContext.sparkContext().broadcast(producer);

Then within an closure called by a foreachRDD, I was trying to get the
wrapped producer, i.e.
 KafkaProducerString, String p = bCastProducer.value();

after rebuilding and rerunning, I got the stack trace like this

Exception in thread main com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
ioThread (org.apache.kafka.clients.producer.KafkaProducer)
producer (my driver)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
at my driver
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
at java.util.Vector$Itr.next(Vector.java:1133)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Cody Koeninger
I wouldn't expect a kafka producer to be serializable at all... among other
things, it has a background thread

On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com
 wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
 at my driver
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.util.ConcurrentModificationException
 at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
 at