why more than more jobs in a batch in spark streaming ?

2015-09-24 Thread Shenghua(Daniel) Wan
Hi,
I noticed that in my streaming application reading from Kafka using
multiple receivers, there are 3 jobs in one batch (via web UI).
According to DAG there are two stages, job 0 execute both 2 stages, but job
1 and job 2 only execute stage 2. There is a disconnection between my
understanding and reality. I have gone over the book and did some googling,
but still I could not find the relationship between batch and jobs. Could
anyone share insights?
Thanks a lot!

-- 

Regards,
Shenghua (Daniel) Wan


Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Shenghua(Daniel) Wan
+1

I wish I have read this blog earlier. I am using Java and have just
implemented a singleton producer per executor/JVM during the day.
Yes, I did see that NonSerializableException when I was debugging the code
...

Thanks for sharing.

On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com wrote:

 Its a cool blog post! Tweeted it!
 Broadcasting the configuration necessary for lazily instantiating the
 producer is a good idea.

 Nitpick: The first code example has an extra `}` ;)

 On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
 wrote:

 As long as Kafka producent is thread-safe you don't need any pool at all.
 Just share single producer on every executor. Please look at my blog post
 for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable
 is some immutable piece of serializable DATA that can be used for
 processing on the executors. A Kafka producer is neither DATA nor
 immutable, and definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get
 the wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
All of you are right.

I was trying to create too many producers. My idea was to create a pool(for
now the pool contains only one producer) shared by all the executors.
After I realized it was related to the serializable issues (though I did
not find clear clues in the source code to indicate the broacast template
type parameter must be implement serializable),  I followed spark cassandra
connector design and created a singleton of Kafka producer pools. There is
not exception noticed.

Thanks for all your comments.


On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote:

 Why are you even trying to broadcast a producer? A broadcast variable is
 some immutable piece of serializable DATA that can be used for processing
 on the executors. A Kafka producer is neither DATA nor immutable, and
 definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85

broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 41 more

​Thanks.​

-- 

Regards,
Shenghua (Daniel) Wan