RE: Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread Cheng, Hao
This is a very interesting issue, the root reason for the lower performance 
probably is, in Scala UDF, Spark SQL converts the data type from internal 
representation to Scala representation via Scala reflection recursively.

Can you create a Jira issue for tracking this? I can start to work on the 
improvement soon.

From: zzcclp [mailto:441586...@qq.com]
Sent: Monday, March 23, 2015 5:10 PM
To: user@spark.apache.org
Subject: Spark SQL udf(ScalaUdf) is very slow

My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G 
parquet files from hdfs Test case: 1. register floor func with command: 
sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), then run with sql 
select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, 
floor(ts), it takes 17 minutes. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS 
tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning 
[chan#23015,PartialGroup#23500], 54) Aggregate true, 
[chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS 
PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at 
newParquet.scala:562 2. run with sql select chan, (ts - ts % 300) as tt, 
sum(size) from qlogbase3 group by chan, (ts - ts % 300), it takes only 5 
minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], 
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS 
c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) 
Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], 
[chan#23015,(ts#23016 - (ts#23016 % 300)) AS 
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at 
newParquet.scala:562 3. use HiveContext with sql select chan, floor((ts - ts % 
300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300)) it 
takes only 5 minutes too. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS 
tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange 
(HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300)))], 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS 
PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], 
MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? 
How to improve it?

View this message in context: Spark SQL udf(ScalaUdf) is very 
slowhttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.


Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread zzcclp
My test env:1. Spark version is 1.3.02. 3 node per 80G/20C3. read 250G
parquet files from hdfs Test case:1. register floor func with command:
*sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), *then run
with sql select chan, floor(ts) as tt, sum(size) from qlogbase3 group by
chan, floor(ts), *it takes 17 minutes.*== Physical Plan == 
   
Aggregate false, [chan#23015,PartialGroup#23500],
[chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS
c2#23495L] Exchange (HashPartitioning [chan#23015,PartialGroup#23500], 54) 
Aggregate true, [chan#23015,*scalaUDF*(ts#23016)],
[chan#23015,scalaUDF(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS
PartialSum#23499L]   PhysicalRDD [chan#23015,ts#23016,size#23023L],
MapPartitionsRDD[115] at map at newParquet.scala:5622.run with sql select
chan, (ts - ts % 300) as tt, sum(size) from qlogbase3 group by chan, (ts -
ts % 300), *it takes only 5 minutes.*== Physical Plan == Aggregate false,
[chan#23015,PartialGroup#23349], [chan#23015,PartialGroup#23349 AS
tt#23343,CombineSum(PartialSum#23348L) AS c2#23344L]Exchange
(HashPartitioning [chan#23015,PartialGroup#23349], 54)Aggregate
true, [chan#23015,(ts#23016 - (ts#23016 % 300))], [chan#23015,(ts#23016 -
(ts#23016 % 300)) AS PartialGroup#23349,SUM(size#23023L) AS
PartialSum#23348L]   PhysicalRDD [chan#23015,ts#23016,size#23023L],
MapPartitionsRDD[83] at map at newParquet.scala:5623. use *HiveContext* with
sql select chan, floor((ts - ts % 300)) as tt, sum(size) from qlogbase3
group by chan, floor((ts - ts % 300)) it takes only 5 minutes too.==
Physical Plan == Aggregate false, [chan#23015,PartialGroup#23108L],
[chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L)
AS _c2#23103L] Exchange (HashPartitioning [chan#23015,PartialGroup#23108L],
54)  Aggregate true,
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
- (ts#23016 % 300)))],
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
- (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS
PartialSum#23107L]   PhysicalRDD [chan#23015,ts#23016,size#23023L],
MapPartitionsRDD[28] at map at newParquet.scala:562*Why? ScalaUdf is so
slow?? How to improve it?*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: RE: Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread ??o0/ka????
Hi, Cheng Hao, thank you for your reply. 


I create a issue https://issues.apache.org/jira/browse/SPARK-6483 for this.








-- Original --
From:  Cheng, Hao;hao.ch...@intel.com;
Date:  Mon, Mar 23, 2015 10:08 PM
To:  ??o0/ka441586...@qq.com; 
user@spark.apache.orguser@spark.apache.org; 

Subject:  RE: Spark SQL udf(ScalaUdf) is very slow



  
This is a very interesting issue, the root reason for the lower performance 
probably is, in Scala UDF, Spark SQL converts the data type from internal 
representation  to Scala representation via Scala reflection recursively.
 
 
 
Can you create a Jira issue for tracking this? I can start to work on the 
improvement soon.
 
 
 
From: zzcclp [mailto:441586...@qq.com] 
 Sent: Monday, March 23, 2015 5:10 PM
 To: user@spark.apache.org
 Subject: Spark SQL udf(ScalaUdf) is very slow
 
 
 
My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G 
parquet files from hdfs Test case: 1. register floor func with command: 
sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), then run with sql 
select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, 
floor(ts), it takes 17 minutes. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS 
tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning 
[chan#23015,PartialGroup#23500], 54) Aggregate  true, 
[chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS 
PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at 
newParquet.scala:562 2. run with sql select  chan, (ts - ts % 300) as tt, 
sum(size) from qlogbase3 group by chan, (ts - ts % 300), it takes only 5 
minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], 
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS 
c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) 
Aggregate  true, [chan#23015,(ts#23016 - (ts#23016 % 300))], 
[chan#23015,(ts#23016 - (ts#23016 % 300)) AS 
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at 
newParquet.scala:562 3. use HiveContext with sql select chan, floor((ts - ts % 
300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300)) it 
takes only 5 minutes too. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L  AS 
tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange 
(HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300)))], 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
  - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS 
PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], 
MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? 
How to improve it? 
  
 
 
View this message in context:  Spark SQL udf(ScalaUdf) is very slow
 Sent from the Apache Spark User List mailing list archive at Nabble.com.