Thanks josh ... i'll take a look On 31 Aug 2015 19:21, "Josh Rosen" <rosenvi...@gmail.com> wrote:
> There are currently a few known issues with using KryoSerializer as the > closure serializer, so it's going to require some changes to Spark if we > want to properly support this. See > https://github.com/apache/spark/pull/6361 and > https://issues.apache.org/jira/browse/SPARK-7708 for some discussion of > the difficulties here. > > On Mon, Aug 31, 2015 at 3:44 AM, yash datta <sau...@gmail.com> wrote: > >> Hi devs, >> >> Curently the only supported serializer for serializing tasks in >> DAGScheduler.scala is JavaSerializer. >> >> >> val taskBinaryBytes: Array[Byte] = stage match { >> case stage: ShuffleMapStage => >> closureSerializer.serialize((stage.rdd, stage.shuffleDep): >> AnyRef).array() >> case stage: ResultStage => >> closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : >> AnyRef).array() >> } >> >> taskBinary = sc.broadcast(taskBinaryBytes) >> >> >> Could somebody give me pointers as to what all is involved if we want to >> change it to KryoSerializer ? >> >> >> >> One suggestion here >> >> >> http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html >> >> was to use chill-scala ' s KryoSerializer >> for closureSerializer : >> >> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() >> >> >> >> But on digging the code it looks like KryoSerializer being used is from >> twitter chill library only. >> >> in KryoSerializer.scala : >> >> val instantiator = new EmptyScalaKryoInstantiator >> val kryo = instantiator.newKryo() >> >> ---------------------------------------- >> >> package com.twitter.chill >> class EmptyScalaKryoInstantiator() extends >> com.twitter.chill.KryoInstantiator { >> override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code >> */ } >> } >> >> >> >> I am working on a low latency job and much of the time is spent in >> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb. >> Thoughts ? Is this reasonable ? Wanted to check if shifting to >> kryoserializer helps here. >> >> I am serializing a UnionRDD which is created by code like this : >> >> >> rdds here is a list of schemaRDDs >> >> >> val condition = 'column === indexValue >> >> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields >> >> val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*)) >> >> >> >> val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func)) >> >> >> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList) >> >> >> My partitioner above selects one partition (from 100 partitions) per RDD >> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has >> 127 partitions >> >> Calling unioned.collect leads to serialization of UnionRDD. >> >> I am using spark 1.2.1 >> >> >> Any help regarding this will be highly appreciated. >> >> >> Best >> Yash Datta >> >> >> -- >> When events unfold with calm and ease >> When the winds that blow are merely breeze >> Learn from nature, from birds and bees >> Live your life in love, and let joy not cease. >> > >