Thanks everybody for the advice on this. I attached YourKit and found that the CPU time split was about 70% in Parquet/LZO reading and 30% applying the filter predicate. I guess those are reasonable things for it to be spending time on, and so it really could just be a case of needing more hardware to cope with that volume of rows. That's not such a problem, as the cluster wasn't exactly huge when testing - just a couple of nodes.
Further, we've not been making use of the partitioning support for Parquet data, which would actually give us a simple way to control how much historical data to go sifting through. Turns out we're already writing our data as <type>/<timestamp>/<parquet file>, we just missed the "date=" naming convention - d'oh! At least that means a fairly simple rename script should get us out of trouble! Appreciate everyone's tips, thanks again! James. On 23 June 2015 at 17:25, Sabarish Sasidharan < sabarish.sasidha...@manthan.com> wrote: > 64GB in parquet could be many billions of rows because of the columnar > compression. And count distinct by itself is an expensive operation. This > is not just on Spark, even on Presto/Impala, you would see performance dip > with count distincts. And the cluster is not that powerful either. > > The one issue here is that Spark has to sift through all the data to get > to just a week's worth. To achieve better performance you might want to > partition the data by date/week and then Spark wouldn't have to sift > through all the billions of rows to get to the millions it needs to > aggregate. > > Regards > Sab > > On Tue, Jun 23, 2015 at 4:35 PM, James Aley <james.a...@swiftkey.com> > wrote: > >> Thanks for the suggestions everyone, appreciate the advice. >> >> I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 >> instead of 1.3, replacing the date casts with a "between" operation on the >> corresponding long constants instead and changing COUNT(*) to COUNT(1). >> None of these seem to have made any remarkable difference in running time >> for the query. >> >> I'll hook up YourKit and see if we can figure out where the CPU time is >> going, then post back. >> >> On 22 June 2015 at 16:01, Yin Huai <yh...@databricks.com> wrote: >> >>> Hi James, >>> >>> Maybe it's the DISTINCT causing the issue. >>> >>> I rewrote the query as follows. Maybe this one can finish faster. >>> >>> select >>> sum(cnt) as uses, >>> count(id) as users >>> from ( >>> select >>> count(*) cnt, >>> cast(id as string) as id, >>> from usage_events >>> where >>> from_unixtime(cast(timestamp_millis/1000 as bigint)) between >>> '2015-06-09' and '2015-06-16' >>> group by cast(id as string) >>> ) tmp >>> >>> Thanks, >>> >>> Yin >>> >>> On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke <jornfra...@gmail.com> >>> wrote: >>> >>>> Generally (not only spark sql specific) you should not cast in the >>>> where part of a sql query. It is also not necessary in your case. Getting >>>> rid of casts in the whole query will be also beneficial. >>>> >>>> Le lun. 22 juin 2015 à 17:29, James Aley <james.a...@swiftkey.com> a >>>> écrit : >>>> >>>>> Hello, >>>>> >>>>> A colleague of mine ran the following Spark SQL query: >>>>> >>>>> select >>>>> count(*) as uses, >>>>> count (distinct cast(id as string)) as users >>>>> from usage_events >>>>> where >>>>> from_unixtime(cast(timestamp_millis/1000 as bigint)) >>>>> between '2015-06-09' and '2015-06-16' >>>>> >>>>> The table contains billions of rows, but totals only 64GB of data >>>>> across ~30 separate files, which are stored as Parquet with LZO >>>>> compression >>>>> in S3. >>>>> >>>>> From the referenced columns: >>>>> >>>>> * id is Binary, which we cast to a String so that we can DISTINCT by >>>>> it. (I was already told this will improve in a later release, in a >>>>> separate >>>>> thread.) >>>>> * timestamp_millis is a long, containing a unix timestamp with >>>>> millisecond resolution >>>>> >>>>> This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 >>>>> instances, using 20 executors, each with 4GB memory. I can see from >>>>> monitoring tools that the CPU usage is at 100% on all nodes, but incoming >>>>> network seems a bit low at 2.5MB/s, suggesting to me that this is >>>>> CPU-bound. >>>>> >>>>> Does that seem slow? Can anyone offer any ideas by glancing at the >>>>> query as to why this might be slow? We'll profile it meanwhile and post >>>>> back if we find anything ourselves. >>>>> >>>>> A side issue - I've found that this query, and others, sometimes >>>>> completes but doesn't return any results. There appears to be no error >>>>> that >>>>> I can see in the logs, and Spark reports the job as successful, but the >>>>> connected JDBC client (SQLWorkbenchJ in this case), just sits there >>>>> forever >>>>> waiting. I did a quick Google and couldn't find anyone else having similar >>>>> issues. >>>>> >>>>> >>>>> Many thanks, >>>>> >>>>> James. >>>>> >>>> >>> >> > > > -- > > Architect - Big Data > Ph: +91 99805 99458 > > Manthan Systems | *Company of the year - Analytics (2014 Frost and > Sullivan India ICT)* > +++ >