Re: Spark SQL and number of task

2016-08-04 Thread Marco Colombo
Thanks a lot. That was my suspect.
Was is struggling me is that also in case of OR the pushdown is present is
the explain plan from hive, while effectively is not performed by the
client.

Regards

2016-08-04 15:37 GMT+02:00 Yong Zhang :

> The 2 plans look similar, but they are big difference, if you also
> consider that your source is in fact from a no-sql DB, like C*.
>
>
> The OR plan has "Filter ((id#0L = 94) || (id#0L = 2))", which means the
> filter is indeed happening on Spark side, instead of on C* side. Which
> means to fulfill your query, Spark has to load all the data back C* (Image
> your have millions of IDs), and filter most of them out, and only keep data
> with id 94 and 2. The IO is bottleneck in this case, and huge data need to
> transfer from C* to spark.
>
>
> In the other case, the ids being pushed down to C* (and in most case, the
> id is the primary key (or at least partition key)), so C* will find the
> data for these 2 ids very fast, and only return the matching data back to
> Spark, then doing the aggregation based on very small data in Spark. That
> is why your performance is big difference in these 2 cases.
>
>
> You can argue that Spark-Cassandra connector should be smarter to handle
> the "OR" case. But in general, OR is not  easy to handle, as in most cases,
> "OR" will be applied on different columns, instead of only on IDs in this
> case.
>
>
> If your query will use partition keys in C*, always use them with either
> "=" or "in". If not, then you have to wait for the data transfer from C* to
> spark. Spark + C* allow to run any ad-hoc queries, but you need to know the
> underline price paid.
>
>
> Yong
>
>
> ------------------
> *From:* Takeshi Yamamuro 
> *Sent:* Thursday, August 4, 2016 8:18 AM
> *To:* Marco Colombo
> *Cc:* user
> *Subject:* Re: Spark SQL and number of task
>
> Seems the performance difference comes from `CassandraSourceRelation`.
> I'm not familiar with the implementation though, I guess the filter `IN`
> is pushed down
> into the datasource and the other not.
>
> You'd better off checking performance metrics in webUI.
>
> // maropu
>
> On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo  > wrote:
>
>> Ok, thanx.
>> The 2 plan are very similar
>>
>> with in condition
>> +---
>> 
>> ---+--+
>> |
>> plan   |
>> +---
>> 
>> ---+--+
>> | == Physical Plan ==
>>  |
>> | TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
>> output=[id#0L,_c1#81])  |
>> | +- TungstenExchange hashpartitioning(id#0L,10), None
>>   |
>> |+- TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
>> output=[id#0L,sum#85,count#86L])|
>> |   +- Scan 
>> org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
>> PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
>> +---
>> 
>> ---+--+
>>
>> with the or condition
>> +---
>> 
>> ---+--+
>> |
>> plan   |
>> +---
>> 
>> ---+--+
>> | == Physical Plan ==
>>  |
>> | TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
>> output=[id#0L,_c1#88])  |
>> | +- TungstenExchange hashpartitioning(id#0L,10), None
>>

Re: Spark SQL and number of task

2016-08-04 Thread Yong Zhang
The 2 plans look similar, but they are big difference, if you also consider 
that your source is in fact from a no-sql DB, like C*.


The OR plan has "Filter ((id#0L = 94) || (id#0L = 2))", which means the filter 
is indeed happening on Spark side, instead of on C* side. Which means to 
fulfill your query, Spark has to load all the data back C* (Image your have 
millions of IDs), and filter most of them out, and only keep data with id 94 
and 2. The IO is bottleneck in this case, and huge data need to transfer from 
C* to spark.


In the other case, the ids being pushed down to C* (and in most case, the id is 
the primary key (or at least partition key)), so C* will find the data for 
these 2 ids very fast, and only return the matching data back to Spark, then 
doing the aggregation based on very small data in Spark. That is why your 
performance is big difference in these 2 cases.


You can argue that Spark-Cassandra connector should be smarter to handle the 
"OR" case. But in general, OR is not  easy to handle, as in most cases, "OR" 
will be applied on different columns, instead of only on IDs in this case.


If your query will use partition keys in C*, always use them with either "=" or 
"in". If not, then you have to wait for the data transfer from C* to spark. 
Spark + C* allow to run any ad-hoc queries, but you need to know the underline 
price paid.


Yong



From: Takeshi Yamamuro 
Sent: Thursday, August 4, 2016 8:18 AM
To: Marco Colombo
Cc: user
Subject: Re: Spark SQL and number of task

Seems the performance difference comes from `CassandraSourceRelation`.
I'm not familiar with the implementation though, I guess the filter `IN` is 
pushed down
into the datasource and the other not.

You'd better off checking performance metrics in webUI.

// maropu

On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo 
mailto:ing.marco.colo...@gmail.com>> wrote:
Ok, thanx.
The 2 plan are very similar

with in condition
+--+--+
|   plan
   |
+--+--+
| == Physical Plan ==   
   |
| TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#81])
  |
| +- TungstenExchange hashpartitioning(id#0L,10), None  
   |
|+- TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Partial,isDistinct=false)], 
output=[id#0L,sum#85,count#86L])|
|   +- Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] 
PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
+--+--+

with the or condition
+--+--+
|   plan
   |
+--+--+
| == Physical Plan ==   
   |
| TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#88])
  |
| +- TungstenExchange hashpartitioning(id#0L,10), None  
   |
|+- TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Partial,isDistinct=false)], 
output=[id#0L,sum#92,count#93L])|
|   +- Filter ((id#0L = 94) || (id#0L = 2)) 
   |
|  +- Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] 
PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))]  |
+--+--+


Filters are pushed down, so I cannot realize why it is performi

Re: Spark SQL and number of task

2016-08-04 Thread Takeshi Yamamuro
Seems the performance difference comes from `CassandraSourceRelation`.
I'm not familiar with the implementation though, I guess the filter `IN` is
pushed down
into the datasource and the other not.

You'd better off checking performance metrics in webUI.

// maropu

On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo 
wrote:

> Ok, thanx.
> The 2 plan are very similar
>
> with in condition
>
> +--+--+
> |
> plan   |
>
> +--+--+
> | == Physical Plan ==
>  |
> | TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
> output=[id#0L,_c1#81])  |
> | +- TungstenExchange hashpartitioning(id#0L,10), None
> |
> |+- TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
> output=[id#0L,sum#85,count#86L])|
> |   +- Scan
> org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
> PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
>
> +--+--+
>
> with the or condition
>
> +--+--+
> |
> plan   |
>
> +--+--+
> | == Physical Plan ==
>  |
> | TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
> output=[id#0L,_c1#88])  |
> | +- TungstenExchange hashpartitioning(id#0L,10), None
> |
> |+- TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
> output=[id#0L,sum#92,count#93L])|
> |   +- Filter ((id#0L = 94) || (id#0L = 2))
>  |
> |  +- Scan
> org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
> PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))]  |
>
> +--+--+
>
>
> Filters are pushed down, so I cannot realize why it is performing a so big
> data extraction in case of or. It's like a full table scan.
>
> Any advice?
>
> Thanks!
>
>
> 2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro :
>
>> Hi,
>>
>> Please type `sqlCtx.sql("select *  ").explain` to show execution
>> plans.
>> Also, you can kill jobs from webUI.
>>
>> // maropu
>>
>>
>> On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo <
>> ing.marco.colo...@gmail.com> wrote:
>>
>>> Hi all, I've a question on how hive+spark are handling data.
>>>
>>> I've started a new HiveContext and I'm extracting data from cassandra.
>>> I've configured spark.sql.shuffle.partitions=10.
>>> Now, I've following query:
>>>
>>> select d.id, avg(d.avg) from v_points d where id=90 group by id;
>>>
>>> I see that 10 task are submitted and execution is fast. Every id on that
>>> table has 2000 samples.
>>>
>>> But if I just add a new id, as:
>>>
>>> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id;
>>>
>>> it adds 663 task and query does not end.
>>>
>>> If I write query with in () like
>>>
>>> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;
>>>
>>> query is again fast.
>>>
>>> How can I get the 'execution plan' of the query?
>>>
>>> And also, how can I kill the long running submitted tasks?
>>>
>>> Thanks all!
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Ing. Marco Colombo
>



-- 
---
Takeshi Yamamuro


Re: Spark SQL and number of task

2016-08-04 Thread Marco Colombo
Ok, thanx.
The 2 plan are very similar

with in condition
+--+--+
|
plan   |
+--+--+
| == Physical Plan ==
   |
| TungstenAggregate(key=[id#0L],
functions=[(avg(avg#2),mode=Final,isDistinct=false)],
output=[id#0L,_c1#81])  |
| +- TungstenExchange hashpartitioning(id#0L,10), None
|
|+- TungstenAggregate(key=[id#0L],
functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
output=[id#0L,sum#85,count#86L])|
|   +- Scan
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
+--+--+

with the or condition
+--+--+
|
plan   |
+--+--+
| == Physical Plan ==
   |
| TungstenAggregate(key=[id#0L],
functions=[(avg(avg#2),mode=Final,isDistinct=false)],
output=[id#0L,_c1#88])  |
| +- TungstenExchange hashpartitioning(id#0L,10), None
|
|+- TungstenAggregate(key=[id#0L],
functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
output=[id#0L,sum#92,count#93L])|
|   +- Filter ((id#0L = 94) || (id#0L = 2))
   |
|  +- Scan
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))]  |
+--+--+


Filters are pushed down, so I cannot realize why it is performing a so big
data extraction in case of or. It's like a full table scan.

Any advice?

Thanks!


2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro :

> Hi,
>
> Please type `sqlCtx.sql("select *  ").explain` to show execution plans.
> Also, you can kill jobs from webUI.
>
> // maropu
>
>
> On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo  > wrote:
>
>> Hi all, I've a question on how hive+spark are handling data.
>>
>> I've started a new HiveContext and I'm extracting data from cassandra.
>> I've configured spark.sql.shuffle.partitions=10.
>> Now, I've following query:
>>
>> select d.id, avg(d.avg) from v_points d where id=90 group by id;
>>
>> I see that 10 task are submitted and execution is fast. Every id on that
>> table has 2000 samples.
>>
>> But if I just add a new id, as:
>>
>> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id;
>>
>> it adds 663 task and query does not end.
>>
>> If I write query with in () like
>>
>> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;
>>
>> query is again fast.
>>
>> How can I get the 'execution plan' of the query?
>>
>> And also, how can I kill the long running submitted tasks?
>>
>> Thanks all!
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Ing. Marco Colombo


Re: Spark SQL and number of task

2016-08-04 Thread Takeshi Yamamuro
Hi,

Please type `sqlCtx.sql("select *  ").explain` to show execution plans.
Also, you can kill jobs from webUI.

// maropu


On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo 
wrote:

> Hi all, I've a question on how hive+spark are handling data.
>
> I've started a new HiveContext and I'm extracting data from cassandra.
> I've configured spark.sql.shuffle.partitions=10.
> Now, I've following query:
>
> select d.id, avg(d.avg) from v_points d where id=90 group by id;
>
> I see that 10 task are submitted and execution is fast. Every id on that
> table has 2000 samples.
>
> But if I just add a new id, as:
>
> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id;
>
> it adds 663 task and query does not end.
>
> If I write query with in () like
>
> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;
>
> query is again fast.
>
> How can I get the 'execution plan' of the query?
>
> And also, how can I kill the long running submitted tasks?
>
> Thanks all!
>



-- 
---
Takeshi Yamamuro


Spark SQL and number of task

2016-08-04 Thread Marco Colombo
Hi all, I've a question on how hive+spark are handling data.

I've started a new HiveContext and I'm extracting data from cassandra.
I've configured spark.sql.shuffle.partitions=10.
Now, I've following query:

select d.id, avg(d.avg) from v_points d where id=90 group by id;

I see that 10 task are submitted and execution is fast. Every id on that
table has 2000 samples.

But if I just add a new id, as:

select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id;

it adds 663 task and query does not end.

If I write query with in () like

select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;

query is again fast.

How can I get the 'execution plan' of the query?

And also, how can I kill the long running submitted tasks?

Thanks all!