Benyi: bq. spark 1.5.2 gave me a wrong result when the data was about 300~400GB, just for a simple group-by and aggregate
Can you reproduce the above using Spark 1.6.0 ? Thanks On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote: > > - I assume your parquet files are compressed. Gzip or Snappy? > - What spark version did you use? It seems at least 1.4. If you use > spark-sql and tungsten, you might have better performance. but spark 1.5.2 > gave me a wrong result when the data was about 300~400GB, just for a simple > group-by and aggregate. > - Did you use kyro serialization? > - you should have spark.shuffle.compress=true, verify it. > - How many tasks did you use? spark.default.parallelism=? > - What about this: > - Read the data day by day > - compute a bucket id from timestamp, e.g., the date and hour > - Write into different buckets (you probably need a special writer > to write data efficiently without shuffling the data). > - distinct for each bucket. Because each bucket is small, spark can > get it done faster than having everything in one run. > - I think using groupBy (userId, timestamp) might be better than > distinct. I guess distinct() will compare every field. > > > On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > >> And the most frequent operation I am gonna do is find the UserID who have >> some events, then retrieve all the events associted with the UserID. >> >> In this case, how should I partition to speed up the process? >> >> Thanks. >> >> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: >> >>> hey Ted, >>> >>> Event table is like this: UserID, EventType, EventKey, TimeStamp, >>> MetaData. I just parse it from Json and save as Parquet, did not change >>> the partition. >>> >>> Annoyingly, every day's incoming Event data having duplicates among each >>> other. One same event could show up in Day1 and Day2 and probably Day3. >>> >>> I only want to keep single Event table and each day it come so many >>> duplicates. >>> >>> Is there a way I could just insert into Parquet and if duplicate found, >>> just ignore? >>> >>> Thanks, >>> Gavin >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Is your Parquet data source partitioned by date ? >>>> >>>> Can you dedup within partitions ? >>>> >>>> Cheers >>>> >>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> >>>> wrote: >>>> >>>>> I tried on Three day's data. The total input is only 980GB, but the >>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read >>>>> step, which should be another 6.2TB shuffle read. >>>>> >>>>> I think to Dedup, the shuffling can not be avoided. Is there anything >>>>> I could do to stablize this process? >>>>> >>>>> Thanks. >>>>> >>>>> >>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> I got everyday's Event table and want to merge them into a single >>>>>> Event table. But there so many duplicates among each day's data. >>>>>> >>>>>> I use Parquet as the data source. What I am doing now is >>>>>> >>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet >>>>>> file"). >>>>>> >>>>>> Each day's Event is stored in their own Parquet file >>>>>> >>>>>> But it failed at the stage2 which keeps losing connection to one >>>>>> executor. I guess this is due to the memory issue. >>>>>> >>>>>> Any suggestion how I do this efficiently? >>>>>> >>>>>> Thanks, >>>>>> Gavin >>>>>> >>>>> >>>>> >>>> >>> >> >