The 2 plans look similar, but they are big difference, if you also consider 
that your source is in fact from a no-sql DB, like C*.


The OR plan has "Filter ((id#0L = 94) || (id#0L = 2))", which means the filter 
is indeed happening on Spark side, instead of on C* side. Which means to 
fulfill your query, Spark has to load all the data back C* (Image your have 
millions of IDs), and filter most of them out, and only keep data with id 94 
and 2. The IO is bottleneck in this case, and huge data need to transfer from 
C* to spark.


In the other case, the ids being pushed down to C* (and in most case, the id is 
the primary key (or at least partition key)), so C* will find the data for 
these 2 ids very fast, and only return the matching data back to Spark, then 
doing the aggregation based on very small data in Spark. That is why your 
performance is big difference in these 2 cases.


You can argue that Spark-Cassandra connector should be smarter to handle the 
"OR" case. But in general, OR is not  easy to handle, as in most cases, "OR" 
will be applied on different columns, instead of only on IDs in this case.


If your query will use partition keys in C*, always use them with either "=" or 
"in". If not, then you have to wait for the data transfer from C* to spark. 
Spark + C* allow to run any ad-hoc queries, but you need to know the underline 
price paid.


Yong


________________________________
From: Takeshi Yamamuro <linguin....@gmail.com>
Sent: Thursday, August 4, 2016 8:18 AM
To: Marco Colombo
Cc: user
Subject: Re: Spark SQL and number of task

Seems the performance difference comes from `CassandraSourceRelation`.
I'm not familiar with the implementation though, I guess the filter `IN` is 
pushed down
into the datasource and the other not.

You'd better off checking performance metrics in webUI.

// maropu

On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo 
<ing.marco.colo...@gmail.com<mailto:ing.marco.colo...@gmail.com>> wrote:
Ok, thanx.
The 2 plan are very similar

with in condition
+--------------------------------------------------------------------------------------------------------------------------------------------------+--+
|                                                                       plan    
                                                                   |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==                                                           
                                                                   |
| TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#81])    
                                  |
| +- TungstenExchange hashpartitioning(id#0L,10), None                          
                                                                   |
|    +- TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Partial,isDistinct=false)], 
output=[id#0L,sum#85,count#86L])                    |
|       +- Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] 
PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--+

with the or condition
+--------------------------------------------------------------------------------------------------------------------------------------------------+--+
|                                                                       plan    
                                                                   |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==                                                           
                                                                   |
| TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#88])    
                                  |
| +- TungstenExchange hashpartitioning(id#0L,10), None                          
                                                                   |
|    +- TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Partial,isDistinct=false)], 
output=[id#0L,sum#92,count#93L])                    |
|       +- Filter ((id#0L = 94) || (id#0L = 2))                                 
                                                                   |
|          +- Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] 
PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))]  |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--+


Filters are pushed down, so I cannot realize why it is performing a so big data 
extraction in case of or. It's like a full table scan.

Any advice?

Thanks!


2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro 
<linguin....@gmail.com<mailto:linguin....@gmail.com>>:
Hi,

Please type `sqlCtx.sql("select * .... ").explain` to show execution plans.
Also, you can kill jobs from webUI.

// maropu


On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo 
<ing.marco.colo...@gmail.com<mailto:ing.marco.colo...@gmail.com>> wrote:
Hi all, I've a question on how hive+spark are handling data.

I've started a new HiveContext and I'm extracting data from cassandra.
I've configured spark.sql.shuffle.partitions=10.
Now, I've following query:

select d.id<http://d.id>, avg(d.avg) from v_points d where id=90 group by id;

I see that 10 task are submitted and execution is fast. Every id on that table 
has 2000 samples.

But if I just add a new id, as:

select d.id<http://d.id>, avg(d.avg) from v_points d where id=90 or id=2 group 
by id;

it adds 663 task and query does not end.

If I write query with in () like

select d.id<http://d.id>, avg(d.avg) from v_points d where id in (90,2) group 
by id;

query is again fast.

How can I get the 'execution plan' of the query?

And also, how can I kill the long running submitted tasks?

Thanks all!



--
---
Takeshi Yamamuro



--
Ing. Marco Colombo



--
---
Takeshi Yamamuro

Reply via email to