Re: Kryo serialization does not compress
Hi Patrick, Thanks for your reply. I am guessing even an array type will be registered automatically. Is this correct? Thanks, Pradeep -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2400.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Kryo serialization does not compress
Hey There, This is interesting... thanks for sharing this. If you are storing in MEMORY_ONLY then you are just directly storing Java objects in the JVM. So they can't be compressed because they aren't really stored in a known format it's just left up to the JVM. To answer you other question, it's possible that serializing doesn't provide a big space savings for your objects, especially if you are serializing mostly primitive types. It depends a bit what the type of the object it is. One thing is, it would be good to register all of the object types you plan to serailize in Kryo, otherwise you end up with some overhead: http://spark.incubator.apache.org/docs/latest/tuning.html But if you are only serializing simple types these (I think) get registered automatically. On Thu, Mar 6, 2014 at 2:58 AM, pradeeps8 wrote: > We are trying to use kryo serialization, but with kryo serialization ON the > memory consumption does not change. We have tried this on multiple sets of > data. > We have also checked the logs of Kryo serialization and have confirmed that > Kryo is being used. > > Can somebody please help us with this? > > The script used is given below. > SCRIPT > /import scala.collection.JavaConversions.asScalaBuffer > import scala.collection.JavaConversions.mapAsScalaMap > import scala.collection.JavaConverters.asScalaBufferConverter > import scala.collection.mutable.Buffer > import scala.Array > import scala.math.Ordering.Implicits._ > > import org.apache.spark.rdd.RDD > import org.apache.spark.storage.StorageLevel > import org.apache.spark.RangePartitioner > import org.apache.spark.HashPartitioner > > //For Kryo logging > import com.esotericsoftware.minlog.Log > import com.esotericsoftware.minlog.Log._ > Log.set(LEVEL_TRACE); > > val query = "select array(level_1, level_2, level_3, level_4, level_5, > level_6, level_7, level_8, level_9, > > level_10, level_11, level_12, level_13, level_14, level_15, level_16, > level_17, level_18, level_19, level_20, > > level_21, level_22, level_23, level_24, level_25) as unitids, class, cuts, > type, data from table1 p join table2 b on > > (p.UnitId = b.unit_id) where runid = 912 and b.snapshotid = 220 and p.UnitId > = b.unit_id" > > val rows: RDD[((Buffer[Any], String, Buffer[Any]), (String, > scala.collection.mutable.Buffer[Any]))] = > > sc.sql2rdd(query).map(row => > ((row.getList("unitids").asInstanceOf[java.util.List[Any]].asScala, > > row.getString("class"), > row.getList("cuts").asInstanceOf[java.util.List[Any]].asScala), > (row.getString("type"), > > row.getList("data").asInstanceOf[java.util.List[Any]].asScala))) > > var rows2Array: RDD[((Buffer[Any], String, Buffer[Any]), (String, > Array[Float]))] = rows.map(row => (row._1, > > (row._2._1, ((row._2._2.map(y => y match { > case floatWritable: org.apache.hadoop.io.FloatWritable => > floatWritable.get > case lazyFloat: org.apache.hadoop.hive.serde2.`lazy`.LazyFloat => > lazyFloat.getWritableObject().get > case _ => println("unknown data type " + y + " : "); 0 > }))).toArray))) > > var allArrays: RDD[((Array[Long], String, Buffer[Any]), (String, > Array[Float]))] = rows2Array.map(row => > > ((row._1._1.map(x => x match {case longWritable: > org.apache.hadoop.io.LongWritable => longWritable.get > > case lazyLong: org.apache.hadoop.hive.serde2.`lazy`.LazyLong => > lazyLong.getWritableObject().get case _ => > > println("unknown data type " + x + " : "); 0}).toArray, row._1._2, > row._1._3), row._2)) > > var dataRdd: RDD[((Array[Long], String, Array[String]), (String, > Array[Float]))] = allArrays.map(row => ((row._1._1, > > row._1._2, row._1._3.map(x => x match { case str: String => str case _ => > println("unknown data type " + x + " : > > "); new String("")}).toArray), row._2)) > > dataRdd = dataRdd.partitionBy(new > HashPartitioner(64)).persist(StorageLevel.MEMORY_ONLY_SER) > > dataRdd.count() > / > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2347.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Kryo serialization does not compress
We are trying to use kryo serialization, but with kryo serialization ON the memory consumption does not change. We have tried this on multiple sets of data. We have also checked the logs of Kryo serialization and have confirmed that Kryo is being used. Can somebody please help us with this? The script used is given below. SCRIPT /import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.JavaConverters.asScalaBufferConverter import scala.collection.mutable.Buffer import scala.Array import scala.math.Ordering.Implicits._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.RangePartitioner import org.apache.spark.HashPartitioner //For Kryo logging import com.esotericsoftware.minlog.Log import com.esotericsoftware.minlog.Log._ Log.set(LEVEL_TRACE); val query = "select array(level_1, level_2, level_3, level_4, level_5, level_6, level_7, level_8, level_9, level_10, level_11, level_12, level_13, level_14, level_15, level_16, level_17, level_18, level_19, level_20, level_21, level_22, level_23, level_24, level_25) as unitids, class, cuts, type, data from table1 p join table2 b on (p.UnitId = b.unit_id) where runid = 912 and b.snapshotid = 220 and p.UnitId = b.unit_id" val rows: RDD[((Buffer[Any], String, Buffer[Any]), (String, scala.collection.mutable.Buffer[Any]))] = sc.sql2rdd(query).map(row => ((row.getList("unitids").asInstanceOf[java.util.List[Any]].asScala, row.getString("class"), row.getList("cuts").asInstanceOf[java.util.List[Any]].asScala), (row.getString("type"), row.getList("data").asInstanceOf[java.util.List[Any]].asScala))) var rows2Array: RDD[((Buffer[Any], String, Buffer[Any]), (String, Array[Float]))] = rows.map(row => (row._1, (row._2._1, ((row._2._2.map(y => y match { case floatWritable: org.apache.hadoop.io.FloatWritable => floatWritable.get case lazyFloat: org.apache.hadoop.hive.serde2.`lazy`.LazyFloat => lazyFloat.getWritableObject().get case _ => println("unknown data type " + y + " : "); 0 }))).toArray))) var allArrays: RDD[((Array[Long], String, Buffer[Any]), (String, Array[Float]))] = rows2Array.map(row => ((row._1._1.map(x => x match {case longWritable: org.apache.hadoop.io.LongWritable => longWritable.get case lazyLong: org.apache.hadoop.hive.serde2.`lazy`.LazyLong => lazyLong.getWritableObject().get case _ => println("unknown data type " + x + " : "); 0}).toArray, row._1._2, row._1._3), row._2)) var dataRdd: RDD[((Array[Long], String, Array[String]), (String, Array[Float]))] = allArrays.map(row => ((row._1._1, row._1._2, row._1._3.map(x => x match { case str: String => str case _ => println("unknown data type " + x + " : "); new String("")}).toArray), row._2)) dataRdd = dataRdd.partitionBy(new HashPartitioner(64)).persist(StorageLevel.MEMORY_ONLY_SER) dataRdd.count() / -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Kryo serialization does not compress
Hi All, We are currently trying to benchmark the various cache options on RDDs with respect to speed and efficiency. The data that we are using is mostly filled with numbers (floating point). We have noticed that the memory consumption of the RDD for MEMORY_ONLY (519.1 MB) and MEMORY_ONLY_SER (511.5 MB) which uses Kryo serialization. Both consumes almost equivalent storage (519.1 MB vs 511.5 MB respectively). Is this behavior expected? Because we were under the impression that kryo serialization is efficient and were expecting it to compress further. Also,we have noticed that when we enable compression(LZ4) on RDDs, the memory consumption of the RDD for MEMORY_ONLY with compression is same as without compression i.e. 519.1 MB. But for MEMORY_ONLY_SER (kryo serialization) with compression consumes only 386.5 MB. Why isn't enabling compression without serialization working for MEMORY_ONLY? Is there anything else we need to do for MEMORY_ONLY to get it compressed? Thanks, Pradeep -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042.html Sent from the Apache Spark User List mailing list archive at Nabble.com.