Hi devs,

Curently the only supported serializer for serializing tasks in
DAGScheduler.scala is JavaSerializer.


val taskBinaryBytes: Array[Byte] = stage match {
  case stage: ShuffleMapStage =>
    closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
  case stage: ResultStage =>
    closureSerializer.serialize((stage.rdd,
stage.resultOfJob.get.func) : AnyRef).array()
  }

taskBinary = sc.broadcast(taskBinaryBytes)


Could somebody give me pointers as to what all is involved if we want to
change it to KryoSerializer ?



One suggestion here

http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html

 was to use chill-scala ' s KryoSerializer
for closureSerializer :

private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()



But on digging the code it looks like KryoSerializer being used is from
twitter chill library only.

in KryoSerializer.scala :

val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()

----------------------------------------

package com.twitter.chill
class EmptyScalaKryoInstantiator() extends com.twitter.chill.KryoInstantiator {
  override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code */ }
}



I am working on a low latency job and much of the time is spent in
serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
Thoughts ? Is this reasonable ? Wanted to check if shifting to
kryoserializer helps here.

I am serializing a UnionRDD which is created by code like this :


rdds here is a list of schemaRDDs


val condition = 'column === indexValue

val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields

val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))



val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))


val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)


My partitioner above selects one partition (from 100 partitions) per RDD
from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
127 partitions

Calling unioned.collect leads to serialization of UnionRDD.

I am using spark 1.2.1


Any help regarding this will be highly appreciated.


Best
Yash Datta


-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.

Reply via email to