Thanks a lot. That was my suspect.
Was is struggling me is that also in case of OR the pushdown is present is
the explain plan from hive, while effectively is not performed by the
client.

Regards

2016-08-04 15:37 GMT+02:00 Yong Zhang <java8...@hotmail.com>:

> The 2 plans look similar, but they are big difference, if you also
> consider that your source is in fact from a no-sql DB, like C*.
>
>
> The OR plan has "Filter ((id#0L = 94) || (id#0L = 2))", which means the
> filter is indeed happening on Spark side, instead of on C* side. Which
> means to fulfill your query, Spark has to load all the data back C* (Image
> your have millions of IDs), and filter most of them out, and only keep data
> with id 94 and 2. The IO is bottleneck in this case, and huge data need to
> transfer from C* to spark.
>
>
> In the other case, the ids being pushed down to C* (and in most case, the
> id is the primary key (or at least partition key)), so C* will find the
> data for these 2 ids very fast, and only return the matching data back to
> Spark, then doing the aggregation based on very small data in Spark. That
> is why your performance is big difference in these 2 cases.
>
>
> You can argue that Spark-Cassandra connector should be smarter to handle
> the "OR" case. But in general, OR is not  easy to handle, as in most cases,
> "OR" will be applied on different columns, instead of only on IDs in this
> case.
>
>
> If your query will use partition keys in C*, always use them with either
> "=" or "in". If not, then you have to wait for the data transfer from C* to
> spark. Spark + C* allow to run any ad-hoc queries, but you need to know the
> underline price paid.
>
>
> Yong
>
>
> ------------------------------
> *From:* Takeshi Yamamuro <linguin....@gmail.com>
> *Sent:* Thursday, August 4, 2016 8:18 AM
> *To:* Marco Colombo
> *Cc:* user
> *Subject:* Re: Spark SQL and number of task
>
> Seems the performance difference comes from `CassandraSourceRelation`.
> I'm not familiar with the implementation though, I guess the filter `IN`
> is pushed down
> into the datasource and the other not.
>
> You'd better off checking performance metrics in webUI.
>
> // maropu
>
> On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo <ing.marco.colo...@gmail.com
> > wrote:
>
>> Ok, thanx.
>> The 2 plan are very similar
>>
>> with in condition
>> +-----------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------+--+
>> |
>> plan                                                                       |
>> +-----------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------+--+
>> | == Physical Plan ==
>>                                                                          |
>> | TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
>> output=[id#0L,_c1#81])                                      |
>> | +- TungstenExchange hashpartitioning(id#0L,10), None
>>                                                                           |
>> |    +- TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
>> output=[id#0L,sum#85,count#86L])                    |
>> |       +- Scan 
>> org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
>> PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
>> +-----------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------+--+
>>
>> with the or condition
>> +-----------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------+--+
>> |
>> plan                                                                       |
>> +-----------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------+--+
>> | == Physical Plan ==
>>                                                                          |
>> | TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
>> output=[id#0L,_c1#88])                                      |
>> | +- TungstenExchange hashpartitioning(id#0L,10), None
>>                                                                           |
>> |    +- TungstenAggregate(key=[id#0L], 
>> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
>> output=[id#0L,sum#92,count#93L])                    |
>> |       +- Filter ((id#0L = 94) || (id#0L = 2))
>>                                                                          |
>> |          +- Scan org.apache.spark.sql.cassandra.
>> CassandraSourceRelation@49243f65[id#0L,avg#2] PushedFilters:
>> [Or(EqualTo(id,94),EqualTo(id,2))]  |
>> +-----------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------+--+
>>
>>
>> Filters are pushed down, so I cannot realize why it is performing a so
>> big data extraction in case of or. It's like a full table scan.
>>
>> Any advice?
>>
>> Thanks!
>>
>>
>> 2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro <linguin....@gmail.com>:
>>
>>> Hi,
>>>
>>> Please type `sqlCtx.sql("select * .... ").explain` to show execution
>>> plans.
>>> Also, you can kill jobs from webUI.
>>>
>>> // maropu
>>>
>>>
>>> On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo <
>>> ing.marco.colo...@gmail.com> wrote:
>>>
>>>> Hi all, I've a question on how hive+spark are handling data.
>>>>
>>>> I've started a new HiveContext and I'm extracting data from cassandra.
>>>> I've configured spark.sql.shuffle.partitions=10.
>>>> Now, I've following query:
>>>>
>>>> select d.id, avg(d.avg) from v_points d where id=90 group by id;
>>>>
>>>> I see that 10 task are submitted and execution is fast. Every id on
>>>> that table has 2000 samples.
>>>>
>>>> But if I just add a new id, as:
>>>>
>>>> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by
>>>> id;
>>>>
>>>> it adds 663 task and query does not end.
>>>>
>>>> If I write query with in () like
>>>>
>>>> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;
>>>>
>>>> query is again fast.
>>>>
>>>> How can I get the 'execution plan' of the query?
>>>>
>>>> And also, how can I kill the long running submitted tasks?
>>>>
>>>> Thanks all!
>>>>
>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Ing. Marco Colombo
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Ing. Marco Colombo

Reply via email to