Thanks everybody for the advice on this.

I attached YourKit and found that the CPU time split was about 70% in
Parquet/LZO reading and 30% applying the filter predicate. I guess those
are reasonable things for it to be spending time on, and so it really could
just be a case of needing more hardware to cope with that volume of rows.
That's not such a problem, as the cluster wasn't exactly huge when testing
- just a couple of nodes.

Further, we've not been making use of the partitioning support for Parquet
data, which would actually give us a simple way to control how much
historical data to go sifting through. Turns out we're already writing our
data as <type>/<timestamp>/<parquet file>, we just missed the "date="
naming convention - d'oh! At least that means a fairly simple rename script
should get us out of trouble!

Appreciate everyone's tips, thanks again!

James.


On 23 June 2015 at 17:25, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> 64GB in parquet could be many billions of rows because of the columnar
> compression. And count distinct by itself is an expensive operation. This
> is not just on Spark, even on Presto/Impala, you would see performance dip
> with count distincts. And the cluster is not that powerful either.
>
> The one issue here is that Spark has to sift through all the data to get
> to just a week's worth. To achieve better performance you might want to
> partition the data by date/week and then Spark wouldn't have to sift
> through all the billions of rows to get to the millions it needs to
> aggregate.
>
> Regards
> Sab
>
> On Tue, Jun 23, 2015 at 4:35 PM, James Aley <james.a...@swiftkey.com>
> wrote:
>
>> Thanks for the suggestions everyone, appreciate the advice.
>>
>> I tried replacing DISTINCT for the nested GROUP BY, running on 1.4
>> instead of 1.3, replacing the date casts with a "between" operation on the
>> corresponding long constants instead and changing COUNT(*) to COUNT(1).
>> None of these seem to have made any remarkable difference in running time
>> for the query.
>>
>> I'll hook up YourKit and see if we can figure out where the CPU time is
>> going, then post back.
>>
>> On 22 June 2015 at 16:01, Yin Huai <yh...@databricks.com> wrote:
>>
>>> Hi James,
>>>
>>> Maybe it's the DISTINCT causing the issue.
>>>
>>> I rewrote the query as follows. Maybe this one can finish faster.
>>>
>>> select
>>>   sum(cnt) as uses,
>>>   count(id) as users
>>> from (
>>>   select
>>>     count(*) cnt,
>>>     cast(id as string) as id,
>>>   from usage_events
>>>   where
>>>     from_unixtime(cast(timestamp_millis/1000 as bigint)) between
>>> '2015-06-09' and '2015-06-16'
>>>   group by cast(id as string)
>>> ) tmp
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Generally (not only spark sql specific) you should not cast in the
>>>> where part of a sql query. It is also not necessary in your case. Getting
>>>> rid of casts in the whole query will be also beneficial.
>>>>
>>>> Le lun. 22 juin 2015 à 17:29, James Aley <james.a...@swiftkey.com> a
>>>> écrit :
>>>>
>>>>> Hello,
>>>>>
>>>>> A colleague of mine ran the following Spark SQL query:
>>>>>
>>>>> select
>>>>>   count(*) as uses,
>>>>>   count (distinct cast(id as string)) as users
>>>>> from usage_events
>>>>> where
>>>>>   from_unixtime(cast(timestamp_millis/1000 as bigint))
>>>>> between '2015-06-09' and '2015-06-16'
>>>>>
>>>>> The table contains billions of rows, but totals only 64GB of data
>>>>> across ~30 separate files, which are stored as Parquet with LZO 
>>>>> compression
>>>>> in S3.
>>>>>
>>>>> From the referenced columns:
>>>>>
>>>>> * id is Binary, which we cast to a String so that we can DISTINCT by
>>>>> it. (I was already told this will improve in a later release, in a 
>>>>> separate
>>>>> thread.)
>>>>> * timestamp_millis is a long, containing a unix timestamp with
>>>>> millisecond resolution
>>>>>
>>>>> This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
>>>>> instances, using 20 executors, each with 4GB memory. I can see from
>>>>> monitoring tools that the CPU usage is at 100% on all nodes, but incoming
>>>>> network seems a bit low at 2.5MB/s, suggesting to me that this is 
>>>>> CPU-bound.
>>>>>
>>>>> Does that seem slow? Can anyone offer any ideas by glancing at the
>>>>> query as to why this might be slow? We'll profile it meanwhile and post
>>>>> back if we find anything ourselves.
>>>>>
>>>>> A side issue - I've found that this query, and others, sometimes
>>>>> completes but doesn't return any results. There appears to be no error 
>>>>> that
>>>>> I can see in the logs, and Spark reports the job as successful, but the
>>>>> connected JDBC client (SQLWorkbenchJ in this case), just sits there 
>>>>> forever
>>>>> waiting. I did a quick Google and couldn't find anyone else having similar
>>>>> issues.
>>>>>
>>>>>
>>>>> Many thanks,
>>>>>
>>>>> James.
>>>>>
>>>>
>>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>

Reply via email to