[ 
https://issues.apache.org/jira/browse/SPARK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14495475#comment-14495475
 ] 

Yin Huai commented on SPARK-6368:
---------------------------------

I am adding results of a simple benchmark.

I build the master with 
{code}
build/sbt -Phive assembly
{code}
Then, I launched the spark shell with
{code}
bin/spark-shell --master local-cluster[1,1,4096] --conf 
spark.executor.memory=4096m --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.shuffle.compress=false --conf 
spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/var/tmp" --conf fs.local.block.size=536870912 -v
{code}

The following code was used
{code}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val supportedTypes =
      Seq(StringType, BinaryType, BooleanType,
        ByteType, ShortType, IntegerType, LongType,
        DoubleType, DecimalType.Unlimited, DecimalType(6, 5),
        DateType, TimestampType)

val fields = supportedTypes.zipWithIndex.map { case (dataType, index) =>
  StructField(s"col$index", dataType, true)
}
val allColumns = fields.map(_.name).mkString(",")
val schema = StructType(fields)

val rdd =
  sc.parallelize((1 to 8), 8).flatMap { j => (1 to 1000000).map ( i =>
    Row(
      s"str${i}: test serializer2.",
      s"binary${i}: test serializer2.".getBytes("UTF-8"),
      i % 2 == 0,
      i.toByte,
      i.toShort,
      i,
      i.toLong,
      (i + 0.75),
      BigDecimal(Long.MaxValue.toString + ".12345"),
      new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"),
      new java.sql.Date(i),
      new java.sql.Timestamp(i)))
  }
  
sqlContext.createDataFrame(rdd, schema).registerTempTable("shuffle")

sqlContext.sql("cache table shuffle")
{code}

The query was 
{code}
sql(s"""
  select
    ${allColumns}
  from shuffle
  cluster by
    ${allColumns}""").queryExecution.executedPlan(1).execute().foreach(x => 
Unit)
{code}

With the serializer ({{sqlContext.sql("set spark.sql.useSerializer2=true")}}), 
the execution time was 84.750775s. With Kryo ({{sqlContext.sql("set 
spark.sql.useSerializer2=false")}}), the execution time was 189.129494s. I am 
also attaching the profiling results from visualvm (I was using sampler). 

> Build a specialized serializer for Exchange operator. 
> ------------------------------------------------------
>
>                 Key: SPARK-6368
>                 URL: https://issues.apache.org/jira/browse/SPARK-6368
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Yin Huai
>            Assignee: Yin Huai
>            Priority: Critical
>
> Kryo is still pretty slow because it works on individual objects and relative 
> expensive to allocate. For Exchange operator, because the schema for key and 
> value are already defined, we can create a specialized serializer to handle 
> the specific schemas of key and value. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to