Thank you . And speaking of compression, is there big difference on performance between gzip and snappy? And why parquet is using gzip by default?
Thanks. On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Cycling old bits: > http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ > > Gavin: > Which release of hbase did you play with ? > > HBase has been evolving and is getting more stable. > > Cheers > > On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > >> I used to maintain a HBase cluster. The experience with it was not happy. >> >> I just tried query the data from each day's first and dedup with smaller >> set, the performance is acceptable. So I guess I will use this method. >> >> Again, could anyone give advice about: >> >> - Automatically determine the number of reducers for joins and >> groupbys: Currently in Spark SQL, you need to control the degree of >> parallelism post-shuffle using “SET >> spark.sql.shuffle.partitions=[num_tasks];”. >> >> Thanks. >> >> Gavin >> >> >> >> >> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> bq. in an noSQL db such as Hbase >>> >>> +1 :-) >>> >>> >>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote: >>> >>>> One option you may want to explore is writing event table in an noSQL >>>> db such as Hbase. One inherent problem in your approach is you always need >>>> to load either full data set or a defined number of partitions to see if >>>> the event has already come (and no gurantee it is full proof, but lead to >>>> unnecessary loading in most cases). >>>> >>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> >>>> wrote: >>>> >>>>> Hey, >>>>> Thank you for the answer. I checked the setting you mentioend they are >>>>> all correct. I noticed that in the job, there are always only 200 >>>>> reducers >>>>> for shuffle read, I believe it is setting in the sql shuffle parallism. >>>>> >>>>> In the doc, it mentions: >>>>> >>>>> - Automatically determine the number of reducers for joins and >>>>> groupbys: Currently in Spark SQL, you need to control the degree of >>>>> parallelism post-shuffle using “SET >>>>> spark.sql.shuffle.partitions=[num_tasks];”. >>>>> >>>>> >>>>> What would be the ideal number for this setting? Is it based on the >>>>> hardware of cluster? >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Gavin >>>>> >>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> - I assume your parquet files are compressed. Gzip or Snappy? >>>>>> - What spark version did you use? It seems at least 1.4. If you >>>>>> use spark-sql and tungsten, you might have better performance. but >>>>>> spark >>>>>> 1.5.2 gave me a wrong result when the data was about 300~400GB, just >>>>>> for a >>>>>> simple group-by and aggregate. >>>>>> - Did you use kyro serialization? >>>>>> - you should have spark.shuffle.compress=true, verify it. >>>>>> - How many tasks did you use? spark.default.parallelism=? >>>>>> - What about this: >>>>>> - Read the data day by day >>>>>> - compute a bucket id from timestamp, e.g., the date and hour >>>>>> - Write into different buckets (you probably need a special >>>>>> writer to write data efficiently without shuffling the data). >>>>>> - distinct for each bucket. Because each bucket is small, >>>>>> spark can get it done faster than having everything in one run. >>>>>> - I think using groupBy (userId, timestamp) might be better >>>>>> than distinct. I guess distinct() will compare every field. >>>>>> >>>>>> >>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> And the most frequent operation I am gonna do is find the UserID who >>>>>>> have some events, then retrieve all the events associted with the >>>>>>> UserID. >>>>>>> >>>>>>> In this case, how should I partition to speed up the process? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> hey Ted, >>>>>>>> >>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp, >>>>>>>> MetaData. I just parse it from Json and save as Parquet, did not >>>>>>>> change >>>>>>>> the partition. >>>>>>>> >>>>>>>> Annoyingly, every day's incoming Event data having duplicates among >>>>>>>> each other. One same event could show up in Day1 and Day2 and probably >>>>>>>> Day3. >>>>>>>> >>>>>>>> I only want to keep single Event table and each day it come so many >>>>>>>> duplicates. >>>>>>>> >>>>>>>> Is there a way I could just insert into Parquet and if duplicate >>>>>>>> found, just ignore? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Gavin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Is your Parquet data source partitioned by date ? >>>>>>>>> >>>>>>>>> Can you dedup within partitions ? >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I tried on Three day's data. The total input is only 980GB, but >>>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during >>>>>>>>>> shuffle >>>>>>>>>> read step, which should be another 6.2TB shuffle read. >>>>>>>>>> >>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there >>>>>>>>>> anything I could do to stablize this process? >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Hey, >>>>>>>>>>> >>>>>>>>>>> I got everyday's Event table and want to merge them into a >>>>>>>>>>> single Event table. But there so many duplicates among each day's >>>>>>>>>>> data. >>>>>>>>>>> >>>>>>>>>>> I use Parquet as the data source. What I am doing now is >>>>>>>>>>> >>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new >>>>>>>>>>> parquet file"). >>>>>>>>>>> >>>>>>>>>>> Each day's Event is stored in their own Parquet file >>>>>>>>>>> >>>>>>>>>>> But it failed at the stage2 which keeps losing connection to one >>>>>>>>>>> executor. I guess this is due to the memory issue. >>>>>>>>>>> >>>>>>>>>>> Any suggestion how I do this efficiently? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Gavin >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> >> >