Kryo serialization for closures: a workaround
Suppose my mappers can be functions (def) that internally call other classes and create objects and do different things inside. (Or they can even be classes that extend (Foo) = Bar and do the processing in their apply method - but let's ignore this case for now) Spark supports only Java Serialization for closures and forces all the classes inside to implement Serializable and coughs up errors when forced to use Kryo for closures. But one cannot expect all 3rd party libraries to have all classes extend Serializable! Here's a workaround that I thought I'd share in case anyone comes across this problem: You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;) Here's an example of how I did it: def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo = Bar)]) (foo: Foo) : Bar = {kryoWrapper.value.apply(foo)}val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo = Bar) {def apply(foo: Foo) : Bar = { //This is the real function }} Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs. KryoSerializationWrapper refers to this wrapper from amplab/shark https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala Don't you think it's a good idea to have something like this inside the framework itself? :) -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: Kryo serialization for closures: a workaround
Thanks for sending this in. The ASF list doesn't support html so the formatting of the code is a little messed up. For those who want to see the code in clearly formatted text, go to http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html On Sat, May 24, 2014 at 8:32 AM, Nilesh nil...@nileshc.com wrote: Suppose my mappers can be functions (def) that internally call other classes and create objects and do different things inside. (Or they can even be classes that extend (Foo) = Bar and do the processing in their apply method - but let's ignore this case for now) Spark supports only Java Serialization for closures and forces all the classes inside to implement Serializable and coughs up errors when forced to use Kryo for closures. But one cannot expect all 3rd party libraries to have all classes extend Serializable! Here's a workaround that I thought I'd share in case anyone comes across this problem: You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;) Here's an example of how I did it: def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo = Bar)]) (foo: Foo) : Bar = {kryoWrapper.value.apply(foo)}val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo = Bar) {def apply(foo: Foo) : Bar = { //This is the real function }} Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs. KryoSerializationWrapper refers to this wrapper from amplab/shark https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala Don't you think it's a good idea to have something like this inside the framework itself? :) -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: No output from Spark Streaming program with Spark 1.0
I looked at the Streaming UI for my job and it reports that it has processed many batches, but that none of the batches had any records in them. Unfortunately, that’s what I expected. :-( I’ve tried multiple test programs and I’m seeing the same thing. The Kafka sources are alive and well and the programs all worked on 0.9 from Eclipse. And there’s no indication of any failure — just no records are being delivered. Any ideas would be much appreciated … Thanks, Jim On 5/23/14, 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Few more suggestions. 1. See the web ui, is the system running any jobs? If not, then you may need to give the system more nodes. Basically the system should have more cores than the number of receivers. 2. Furthermore there is a streaming specific web ui which gives more streaming specific data. On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell pwend...@gmail.com wrote: Also one other thing to try, try removing all of the logic form inside of foreach and just printing something. It could be that somehow an exception is being triggered inside of your foreach block and as a result the output goes away. On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jim, Do you see the same behavior if you run this outside of eclipse? Also, what happens if you print something to standard out when setting up your streams (i.e. not inside of the foreach) do you see that? This could be a streaming issue, but it could also be something related to the way it's running in eclipse. - Patrick On Fri, May 23, 2014 at 2:57 PM, Jim Donahue jdona...@adobe.com wrote: I¹m trying out 1.0 on a set of small Spark Streaming tests and am running into problems. Here¹s one of the little programs I¹ve used for a long time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does some simple counting. The program starts OK (it connects to the Kafka stream fine) and generates a stream of INFO logging messages, but never generates any output. :-( I¹m running this in Eclipse, so there may be some class loading issue (loading the wrong class or something like that), but I¹m not seeing anything in the console output. Thanks, Jim Donahue Adobe val kafka_messages = KafkaUtils.createStream[Array[Byte], Array[Byte], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc, propsMap, topicMap, StorageLevel.MEMORY_AND_DISK) val messages = kafka_messages.map(_._2) val total = ssc.sparkContext.accumulator(0) val startTime = new java.util.Date().getTime() val jsonstream = messages.map[JSONObject](message = {val string = new String(message); val json = new JSONObject(string); total += 1 json } ) val deleted = ssc.sparkContext.accumulator(0) val msgstream = jsonstream.filter(json = if (!json.has(delete)) true else { deleted += 1; false} ) msgstream.foreach(rdd = { if(rdd.count() 0){ val data = rdd.map(json = (json.has(entities), json.length())).collect() val entities: Double = data.count(t = t._1) val fieldCounts = data.sortBy(_._2) val minFields = fieldCounts(0)._2 val maxFields = fieldCounts(fieldCounts.size - 1)._2 val now = new java.util.Date() val interval = (now.getTime() - startTime) / 1000 System.out.println(now.toString) System.out.println(processing time: + interval + seconds) System.out.println(total messages: + total.value) System.out.println(deleted messages: + deleted.value) System.out.println(message receipt rate: + (total.value/interval) + per second) System.out.println(messages this interval: + data.length) System.out.println(message fields varied between: + minFields + and + maxFields) System.out.println(fraction with entities is + (entities / data.length)) } } ) ssc.start()
Re: No output from Spark Streaming program with Spark 1.0
What does the kafka receiver status on the streaming UI say when you are connected to the Kafka sources? Does it show any error? Can you find out which machine the receiver is running and see the worker logs for any exceptions / error messages? Try turning on the DEBUG level in log4j. TD On May 24, 2014 4:58 PM, Jim Donahue jdona...@adobe.com wrote: I looked at the Streaming UI for my job and it reports that it has processed many batches, but that none of the batches had any records in them. Unfortunately, that’s what I expected. :-( I’ve tried multiple test programs and I’m seeing the same thing. The Kafka sources are alive and well and the programs all worked on 0.9 from Eclipse. And there’s no indication of any failure — just no records are being delivered. Any ideas would be much appreciated … Thanks, Jim On 5/23/14, 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Few more suggestions. 1. See the web ui, is the system running any jobs? If not, then you may need to give the system more nodes. Basically the system should have more cores than the number of receivers. 2. Furthermore there is a streaming specific web ui which gives more streaming specific data. On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell pwend...@gmail.com wrote: Also one other thing to try, try removing all of the logic form inside of foreach and just printing something. It could be that somehow an exception is being triggered inside of your foreach block and as a result the output goes away. On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jim, Do you see the same behavior if you run this outside of eclipse? Also, what happens if you print something to standard out when setting up your streams (i.e. not inside of the foreach) do you see that? This could be a streaming issue, but it could also be something related to the way it's running in eclipse. - Patrick On Fri, May 23, 2014 at 2:57 PM, Jim Donahue jdona...@adobe.com wrote: I¹m trying out 1.0 on a set of small Spark Streaming tests and am running into problems. Here¹s one of the little programs I¹ve used for a long time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does some simple counting. The program starts OK (it connects to the Kafka stream fine) and generates a stream of INFO logging messages, but never generates any output. :-( I¹m running this in Eclipse, so there may be some class loading issue (loading the wrong class or something like that), but I¹m not seeing anything in the console output. Thanks, Jim Donahue Adobe val kafka_messages = KafkaUtils.createStream[Array[Byte], Array[Byte], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc, propsMap, topicMap, StorageLevel.MEMORY_AND_DISK) val messages = kafka_messages.map(_._2) val total = ssc.sparkContext.accumulator(0) val startTime = new java.util.Date().getTime() val jsonstream = messages.map[JSONObject](message = {val string = new String(message); val json = new JSONObject(string); total += 1 json } ) val deleted = ssc.sparkContext.accumulator(0) val msgstream = jsonstream.filter(json = if (!json.has(delete)) true else { deleted += 1; false} ) msgstream.foreach(rdd = { if(rdd.count() 0){ val data = rdd.map(json = (json.has(entities), json.length())).collect() val entities: Double = data.count(t = t._1) val fieldCounts = data.sortBy(_._2) val minFields = fieldCounts(0)._2 val maxFields = fieldCounts(fieldCounts.size - 1)._2 val now = new java.util.Date() val interval = (now.getTime() - startTime) / 1000 System.out.println(now.toString) System.out.println(processing time: + interval + seconds) System.out.println(total messages: + total.value) System.out.println(deleted messages: + deleted.value) System.out.println(message receipt rate: + (total.value/interval) + per second) System.out.println(messages this interval: + data.length) System.out.println(message fields varied between: + minFields + and + maxFields) System.out.println(fraction with entities is + (entities / data.length)) } } ) ssc.start()