Re: Spark SQL Query Plan optimization

2014-08-02 Thread Michael Armbrust
The number of partitions (which decides the number of tasks) is fixed after
any shuffle and can be configured using 'spark.sql.shuffle.partitions'
though SQLConf (i.e. sqlContext.set(...) or
"SET spark.sql.shuffle.partitions=..." in sql)  It is possible we will auto
select this based on statistics in the future.

I think you might be reading the query plan backwards.  The data starts at
the bottom and moves upwards.  The filter is being performed before the
shuffle (exchange) and join operations.


On Fri, Aug 1, 2014 at 10:13 AM, N.Venkata Naga Ravi 
wrote:

>  Hi,
>
> I am trying to understand the query plan and number of tasks /execution
> time created for joined query.
>
> Consider following example , creating two tables emp, sal with appropriate
> 100 records in each table with key for joining them.
>
> *EmpRDDRelation.scala*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *case class EmpRecord(key: Int, value: String)case class SalRecord(key:
> Int, salary: Int)object EmpRDDRelation {  def main(args: Array[String])
> {val sparkConf = new
> SparkConf().setMaster("local[1]").setAppName("RDDRelation") val sc =
> new SparkContext(sparkConf)val sqlContext = new SQLContext(sc)//
> Importing the SQL context gives access to all the SQL functions and
> implicit conversions.import sqlContext._var rdd=
> sc.parallelize((1 to 100 ).map(i=>EmpRecord(i, s"name_$i")))
> rdd.registerAsTable("emp")// Once tables have been registered, you can
> run SQL queries over them. println("Result of SELECT *:")
> sql("SELECT * FROM emp").collect().foreach(println)var salrdd =
> sc.parallelize((1 to 100).map(i=>SalRecord(i,i*100)))
> salrdd.registerAsTable("sal")  sql("SELECT * FROM
> sal").collect().foreach(println) var salRRDFromSQL= sql("SELECT
> emp.key,value,salary from emp,sal WHERE  emp.key=30 AND emp.key=sal.key")
> salRRDFromSQL.collect().foreach(println) }}*
>
> Here are my observation :
>
> Below is query plan for above join query which creates 150 tasks. I could
> see Filter is added in the plan , but not sure whether taken in optimized
> way. First of all it is not clear why 150 tasks are required, because i
> could see similar 150 tasks when executed the above join query without
> filter "*emp.key=30" *like "*SELECT emp.key,value,salary from emp,sal
> WHERE  emp.key=sal.key"* and took *same time for both cases*. So my
> understanding emp.key =30 filter should take place first and on top of the
> filtered records from emp table it should join with sal table( From the
> Oracle RDBMS perspective) .  But here query plan joins tables first  and
> applies filter later.  Is there anyway we can improve it from code wise or
> does require enhancement from Spark SQL side.
>
> Please review my observation and let me know your comments.
>
>
> == Query Plan ==
> Project [key#0:0,value#1:1,salary#3:3]
>  HashJoin [key#0], [key#2], BuildRight
>
>
> *Exchange (HashPartitioning [key#0:0], 150)Filter (key#0:0 = 30)*
> ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at
> basicOperators.scala:174
>   Exchange (HashPartitioning [key#2:0], 150)
>ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at
> basicOperators.scala:174), which is now runnable
> 14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from
> Stage 2 (SchemaRDD[8] at RDD at SchemaRDD.scala:98
> == Query Plan ==
> Project [key#0:0,value#1:1,salary#3:3]
>  HashJoin [key#0], [key#2], BuildRight
>   Exchange (HashPartitioning [key#0:0], 150)
>Filter (key#0:0 = 30)
> ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at
> basicOperators.scala:174
>   Exchange (HashPartitioning [key#2:0], 150)
>ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at
> basicOperators.scala:174)
> 14/08/01 22:20:02 INFO TaskSchedulerImpl: *Adding task set 2.0 with 150
> tasks*
>


Spark SQL Query Plan optimization

2014-08-01 Thread N . Venkata Naga Ravi






Hi,

I am trying to understand the query plan and number of tasks /execution time 
created for joined query.

Consider following example , creating two tables emp, sal with appropriate 100 
records in each table with key for joining them.

EmpRDDRelation.scala

case class EmpRecord(key: Int, value: String)
case class SalRecord(key: Int, salary: Int)

object EmpRDDRelation {
  def main(args: Array[String]) {
val sparkConf = new 
SparkConf().setMaster("local[1]").setAppName("RDDRelation")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// Importing the SQL context gives access to all the SQL functions and 
implicit conversions.
import sqlContext._

var rdd= sc.parallelize((1 to 100 ).map(i=>EmpRecord(i, s"name_$i")))
 
rdd.registerAsTable("emp")

// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
sql("SELECT * FROM emp").collect().foreach(println)


var salrdd = sc.parallelize((1 to 100).map(i=>SalRecord(i,i*100)))
   
salrdd.registerAsTable("sal")
 sql("SELECT * FROM sal").collect().foreach(println)
 
var salRRDFromSQL= sql("SELECT emp.key,value,salary from emp,sal WHERE  
emp.key=30 AND emp.key=sal.key")
salRRDFromSQL.collect().foreach(println)

   
  }
}

Here are my observation :

Below is query plan for above join query which creates 150 tasks. I could see 
Filter is added in the plan , but not sure whether taken in optimized way. 
First of all it is not clear why 150 tasks are required, because i could see 
similar 150 tasks when executed the above join query without filter 
"emp.key=30" like "SELECT emp.key,value,salary from emp,sal WHERE  
emp.key=sal.key" and took same time for both cases. So my understanding emp.key 
=30 filter should take place first and on top of the filtered records from emp 
table it should join with sal table( From the Oracle RDBMS perspective) .  But 
here query plan joins tables first  and applies filter later.  Is there anyway 
we can improve it from code wise or does require enhancement from Spark SQL 
side.

Please review my observation and let me know your comments.


== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at 
basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:174), which is now runnable
14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from Stage 2 
(SchemaRDD[8] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at 
basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:174)
14/08/01 22:20:02 INFO TaskSchedulerImpl: Adding task set 2.0 with 150 tasks