Kryo serialization for closures: a workaround

2014-05-24 Thread Nilesh
Suppose my mappers can be functions (def) that internally call other classes
and create objects and do different things inside. (Or they can even be
classes that extend (Foo) = Bar and do the processing in their apply method
- but let's ignore this case for now)

Spark supports only Java Serialization for closures and forces all the
classes inside to implement Serializable and coughs up errors when forced to
use Kryo for closures. But one cannot expect all 3rd party libraries to have
all classes extend Serializable!

Here's a workaround that I thought I'd share in case anyone comes across
this problem:

You simply need to serialize the objects before passing through the closure,
and de-serialize afterwards. This approach just works, even if your classes
aren't Serializable, because it uses Kryo behind the scenes. All you need is
some curry. ;) Here's an example of how I did it:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo = Bar)])  
(foo: Foo) : Bar = {kryoWrapper.value.apply(foo)}val mapper =
genMapper(KryoSerializationWrapper(new Blah(abc)))
_rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo =
Bar) {def apply(foo: Foo) : Bar = { //This is the real function }}
Feel free to make Blah as complicated as you want, class, companion object,
nested classes, references to multiple 3rd party libs.

KryoSerializationWrapper refers to  this wrapper from amplab/shark
https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
  

Don't you think it's a good idea to have something like this inside the
framework itself? :)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Kryo serialization for closures: a workaround

2014-05-24 Thread Reynold Xin
Thanks for sending this in.

The ASF list doesn't support html so the formatting of the code is a little
messed up. For those who want to see the code in clearly formatted text, go
to
http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html



On Sat, May 24, 2014 at 8:32 AM, Nilesh nil...@nileshc.com wrote:

 Suppose my mappers can be functions (def) that internally call other
 classes
 and create objects and do different things inside. (Or they can even be
 classes that extend (Foo) = Bar and do the processing in their apply
 method
 - but let's ignore this case for now)

 Spark supports only Java Serialization for closures and forces all the
 classes inside to implement Serializable and coughs up errors when forced
 to
 use Kryo for closures. But one cannot expect all 3rd party libraries to
 have
 all classes extend Serializable!

 Here's a workaround that I thought I'd share in case anyone comes across
 this problem:

 You simply need to serialize the objects before passing through the
 closure,
 and de-serialize afterwards. This approach just works, even if your classes
 aren't Serializable, because it uses Kryo behind the scenes. All you need
 is
 some curry. ;) Here's an example of how I did it:

 def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo = Bar)])
 (foo: Foo) : Bar = {kryoWrapper.value.apply(foo)}val mapper =
 genMapper(KryoSerializationWrapper(new Blah(abc)))
 _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo =
 Bar) {def apply(foo: Foo) : Bar = { //This is the real function }}
 Feel free to make Blah as complicated as you want, class, companion object,
 nested classes, references to multiple 3rd party libs.

 KryoSerializationWrapper refers to  this wrapper from amplab/shark
 
 https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
 

 Don't you think it's a good idea to have something like this inside the
 framework itself? :)



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.


Re: No output from Spark Streaming program with Spark 1.0

2014-05-24 Thread Jim Donahue
I looked at the Streaming UI for my job and it reports that it has
processed many batches, but that none of the batches had any records in
them. Unfortunately, that’s what I expected.  :-(

I’ve tried multiple test programs and I’m seeing the same thing.  The
Kafka sources are alive and well and the programs all worked on 0.9 from
Eclipse.  And there’s no indication of any failure — just no records are
being delivered.

Any ideas would be much appreciated …


Thanks,

Jim


On 5/23/14, 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote:

Few more suggestions.
1. See the web ui, is the system running any jobs? If not, then you may
need to give the system more nodes. Basically the system should have more
cores than the number of receivers.
2. Furthermore there is a streaming specific web ui which gives more
streaming specific data.


On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell pwend...@gmail.com
wrote:

 Also one other thing to try, try removing all of the logic form inside
 of foreach and just printing something. It could be that somehow an
 exception is being triggered inside of your foreach block and as a
 result the output goes away.

 On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com
 wrote:
  Hey Jim,
 
  Do you see the same behavior if you run this outside of eclipse?
 
  Also, what happens if you print something to standard out when setting
  up your streams (i.e. not inside of the foreach) do you see that? This
  could be a streaming issue, but it could also be something related to
  the way it's running in eclipse.
 
  - Patrick
 
  On Fri, May 23, 2014 at 2:57 PM, Jim Donahue jdona...@adobe.com
wrote:
  I¹m trying out 1.0 on a set of small Spark Streaming tests and am
 running
  into problems.  Here¹s one of the little programs I¹ve used for a
long
  time ‹ it reads a Kafka stream that contains Twitter JSON tweets and
 does
  some simple counting.  The program starts OK (it connects to the
Kafka
  stream fine) and generates a stream of INFO logging messages, but
never
  generates any output. :-(
 
  I¹m running this in Eclipse, so there may be some class loading issue
  (loading the wrong class or something like that), but I¹m not seeing
  anything in the console output.
 
  Thanks,
 
  Jim Donahue
  Adobe
 
 
 
  val kafka_messages =
KafkaUtils.createStream[Array[Byte], Array[Byte],
  kafka.serializer.DefaultDecoder,
kafka.serializer.DefaultDecoder](ssc,
  propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
 
 
   val messages = kafka_messages.map(_._2)
 
 
   val total = ssc.sparkContext.accumulator(0)
 
 
   val startTime = new java.util.Date().getTime()
 
 
   val jsonstream = messages.map[JSONObject](message =
{val string = new String(message);
val json = new JSONObject(string);
total += 1
json
}
  )
 
 
  val deleted = ssc.sparkContext.accumulator(0)
 
 
  val msgstream = jsonstream.filter(json =
if (!json.has(delete)) true else { deleted += 1; false}
)
 
 
  msgstream.foreach(rdd = {
if(rdd.count()  0){
val data = rdd.map(json = (json.has(entities),
  json.length())).collect()
val entities: Double = data.count(t = t._1)
val fieldCounts = data.sortBy(_._2)
val minFields = fieldCounts(0)._2
val maxFields = fieldCounts(fieldCounts.size - 1)._2
val now = new java.util.Date()
val interval = (now.getTime() - startTime) / 1000
System.out.println(now.toString)
System.out.println(processing time:  + interval +  seconds)
System.out.println(total messages:  + total.value)
System.out.println(deleted messages:  + deleted.value)
System.out.println(message receipt rate:  +
 (total.value/interval)
  +  per second)
System.out.println(messages this interval:  + data.length)
System.out.println(message fields varied between:  +
minFields
 + 
  and  + maxFields)
System.out.println(fraction with entities is  + (entities /
  data.length))
}
  }
  )
 
  ssc.start()
 




Re: No output from Spark Streaming program with Spark 1.0

2014-05-24 Thread Tathagata Das
What does the kafka receiver status on the streaming UI say when you are
connected to the Kafka sources? Does it show any error?

Can you find out which machine the receiver is running and see the worker
logs for any exceptions / error messages? Try turning on the DEBUG level in
log4j.

TD
On May 24, 2014 4:58 PM, Jim Donahue jdona...@adobe.com wrote:

 I looked at the Streaming UI for my job and it reports that it has
 processed many batches, but that none of the batches had any records in
 them. Unfortunately, that’s what I expected.  :-(

 I’ve tried multiple test programs and I’m seeing the same thing.  The
 Kafka sources are alive and well and the programs all worked on 0.9 from
 Eclipse.  And there’s no indication of any failure — just no records are
 being delivered.

 Any ideas would be much appreciated …


 Thanks,

 Jim


 On 5/23/14, 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote:

 Few more suggestions.
 1. See the web ui, is the system running any jobs? If not, then you may
 need to give the system more nodes. Basically the system should have more
 cores than the number of receivers.
 2. Furthermore there is a streaming specific web ui which gives more
 streaming specific data.
 
 
 On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell pwend...@gmail.com
 wrote:
 
  Also one other thing to try, try removing all of the logic form inside
  of foreach and just printing something. It could be that somehow an
  exception is being triggered inside of your foreach block and as a
  result the output goes away.
 
  On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com
  wrote:
   Hey Jim,
  
   Do you see the same behavior if you run this outside of eclipse?
  
   Also, what happens if you print something to standard out when setting
   up your streams (i.e. not inside of the foreach) do you see that? This
   could be a streaming issue, but it could also be something related to
   the way it's running in eclipse.
  
   - Patrick
  
   On Fri, May 23, 2014 at 2:57 PM, Jim Donahue jdona...@adobe.com
 wrote:
   I¹m trying out 1.0 on a set of small Spark Streaming tests and am
  running
   into problems.  Here¹s one of the little programs I¹ve used for a
 long
   time ‹ it reads a Kafka stream that contains Twitter JSON tweets and
  does
   some simple counting.  The program starts OK (it connects to the
 Kafka
   stream fine) and generates a stream of INFO logging messages, but
 never
   generates any output. :-(
  
   I¹m running this in Eclipse, so there may be some class loading issue
   (loading the wrong class or something like that), but I¹m not seeing
   anything in the console output.
  
   Thanks,
  
   Jim Donahue
   Adobe
  
  
  
   val kafka_messages =
 KafkaUtils.createStream[Array[Byte], Array[Byte],
   kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder](ssc,
   propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
  
  
val messages = kafka_messages.map(_._2)
  
  
val total = ssc.sparkContext.accumulator(0)
  
  
val startTime = new java.util.Date().getTime()
  
  
val jsonstream = messages.map[JSONObject](message =
 {val string = new String(message);
 val json = new JSONObject(string);
 total += 1
 json
 }
   )
  
  
   val deleted = ssc.sparkContext.accumulator(0)
  
  
   val msgstream = jsonstream.filter(json =
 if (!json.has(delete)) true else { deleted += 1; false}
 )
  
  
   msgstream.foreach(rdd = {
 if(rdd.count()  0){
 val data = rdd.map(json = (json.has(entities),
   json.length())).collect()
 val entities: Double = data.count(t = t._1)
 val fieldCounts = data.sortBy(_._2)
 val minFields = fieldCounts(0)._2
 val maxFields = fieldCounts(fieldCounts.size - 1)._2
 val now = new java.util.Date()
 val interval = (now.getTime() - startTime) / 1000
 System.out.println(now.toString)
 System.out.println(processing time:  + interval +  seconds)
 System.out.println(total messages:  + total.value)
 System.out.println(deleted messages:  + deleted.value)
 System.out.println(message receipt rate:  +
  (total.value/interval)
   +  per second)
 System.out.println(messages this interval:  + data.length)
 System.out.println(message fields varied between:  +
 minFields
  + 
   and  + maxFields)
 System.out.println(fraction with entities is  + (entities /
   data.length))
 }
   }
   )
  
   ssc.start()