Thanks josh ... i'll take a look
On 31 Aug 2015 19:21, "Josh Rosen" <> wrote:

> There are currently a few known issues with using KryoSerializer as the
> closure serializer, so it's going to require some changes to Spark if we
> want to properly support this. See
> and
> for some discussion of
> the difficulties here.
> On Mon, Aug 31, 2015 at 3:44 AM, yash datta <> wrote:
>> Hi devs,
>> Curently the only supported serializer for serializing tasks in
>> DAGScheduler.scala is JavaSerializer.
>> val taskBinaryBytes: Array[Byte] = stage match {
>>   case stage: ShuffleMapStage =>
>>     closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
>> AnyRef).array()
>>   case stage: ResultStage =>
>>     closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : 
>> AnyRef).array()
>>   }
>> taskBinary = sc.broadcast(taskBinaryBytes)
>> Could somebody give me pointers as to what all is involved if we want to
>> change it to KryoSerializer ?
>> One suggestion here
>>  was to use chill-scala ' s KryoSerializer
>> for closureSerializer :
>> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
>> But on digging the code it looks like KryoSerializer being used is from
>> twitter chill library only.
>> in KryoSerializer.scala :
>> val instantiator = new EmptyScalaKryoInstantiator
>> val kryo = instantiator.newKryo()
>> ----------------------------------------
>> package com.twitter.chill
>> class EmptyScalaKryoInstantiator() extends 
>> com.twitter.chill.KryoInstantiator {
>>   override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code 
>> */ }
>> }
>> I am working on a low latency job and much of the time is spent in
>> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
>> Thoughts ? Is this reasonable ? Wanted to check if shifting to
>> kryoserializer helps here.
>> I am serializing a UnionRDD which is created by code like this :
>> rdds here is a list of schemaRDDs
>> val condition = 'column === indexValue
>> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields
>> val sddpp = => x.where(condition).select(selectFields: _*))
>> val rddpp = => new PartitionPruningRDD(x, partitioner.func))
>> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)
>> My partitioner above selects one partition (from 100 partitions) per RDD
>> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
>> 127 partitions
>> Calling unioned.collect leads to serialization of UnionRDD.
>> I am using spark 1.2.1
>> Any help regarding this will be highly appreciated.
>> Best
>> Yash Datta
>> --
>> When events unfold with calm and ease
>> When the winds that blow are merely breeze
>> Learn from nature, from birds and bees
>> Live your life in love, and let joy not cease.

Reply via email to