Thanks josh ... i'll take a look
On 31 Aug 2015 19:21, "Josh Rosen" <rosenvi...@gmail.com> wrote:

> There are currently a few known issues with using KryoSerializer as the
> closure serializer, so it's going to require some changes to Spark if we
> want to properly support this. See
> https://github.com/apache/spark/pull/6361 and
> https://issues.apache.org/jira/browse/SPARK-7708 for some discussion of
> the difficulties here.
>
> On Mon, Aug 31, 2015 at 3:44 AM, yash datta <sau...@gmail.com> wrote:
>
>> Hi devs,
>>
>> Curently the only supported serializer for serializing tasks in
>> DAGScheduler.scala is JavaSerializer.
>>
>>
>> val taskBinaryBytes: Array[Byte] = stage match {
>>   case stage: ShuffleMapStage =>
>>     closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
>> AnyRef).array()
>>   case stage: ResultStage =>
>>     closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : 
>> AnyRef).array()
>>   }
>>
>> taskBinary = sc.broadcast(taskBinaryBytes)
>>
>>
>> Could somebody give me pointers as to what all is involved if we want to
>> change it to KryoSerializer ?
>>
>>
>>
>> One suggestion here
>>
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html
>>
>>  was to use chill-scala ' s KryoSerializer
>> for closureSerializer :
>>
>> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
>>
>>
>>
>> But on digging the code it looks like KryoSerializer being used is from
>> twitter chill library only.
>>
>> in KryoSerializer.scala :
>>
>> val instantiator = new EmptyScalaKryoInstantiator
>> val kryo = instantiator.newKryo()
>>
>> ----------------------------------------
>>
>> package com.twitter.chill
>> class EmptyScalaKryoInstantiator() extends 
>> com.twitter.chill.KryoInstantiator {
>>   override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code 
>> */ }
>> }
>>
>>
>>
>> I am working on a low latency job and much of the time is spent in
>> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
>> Thoughts ? Is this reasonable ? Wanted to check if shifting to
>> kryoserializer helps here.
>>
>> I am serializing a UnionRDD which is created by code like this :
>>
>>
>> rdds here is a list of schemaRDDs
>>
>>
>> val condition = 'column === indexValue
>>
>> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields
>>
>> val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))
>>
>>
>>
>> val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))
>>
>>
>> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)
>>
>>
>> My partitioner above selects one partition (from 100 partitions) per RDD
>> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
>> 127 partitions
>>
>> Calling unioned.collect leads to serialization of UnionRDD.
>>
>> I am using spark 1.2.1
>>
>>
>> Any help regarding this will be highly appreciated.
>>
>>
>> Best
>> Yash Datta
>>
>>
>> --
>> When events unfold with calm and ease
>> When the winds that blow are merely breeze
>> Learn from nature, from birds and bees
>> Live your life in love, and let joy not cease.
>>
>
>

Reply via email to