bq. in an noSQL db such as Hbase +1 :-)
On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote: > One option you may want to explore is writing event table in an noSQL db > such as Hbase. One inherent problem in your approach is you always need to > load either full data set or a defined number of partitions to see if the > event has already come (and no gurantee it is full proof, but lead to > unnecessary loading in most cases). > > On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > >> Hey, >> Thank you for the answer. I checked the setting you mentioend they are >> all correct. I noticed that in the job, there are always only 200 reducers >> for shuffle read, I believe it is setting in the sql shuffle parallism. >> >> In the doc, it mentions: >> >> - Automatically determine the number of reducers for joins and >> groupbys: Currently in Spark SQL, you need to control the degree of >> parallelism post-shuffle using “SET >> spark.sql.shuffle.partitions=[num_tasks];”. >> >> >> What would be the ideal number for this setting? Is it based on the >> hardware of cluster? >> >> >> Thanks, >> >> Gavin >> >> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote: >> >>> >>> - I assume your parquet files are compressed. Gzip or Snappy? >>> - What spark version did you use? It seems at least 1.4. If you use >>> spark-sql and tungsten, you might have better performance. but spark >>> 1.5.2 >>> gave me a wrong result when the data was about 300~400GB, just for a >>> simple >>> group-by and aggregate. >>> - Did you use kyro serialization? >>> - you should have spark.shuffle.compress=true, verify it. >>> - How many tasks did you use? spark.default.parallelism=? >>> - What about this: >>> - Read the data day by day >>> - compute a bucket id from timestamp, e.g., the date and hour >>> - Write into different buckets (you probably need a special >>> writer to write data efficiently without shuffling the data). >>> - distinct for each bucket. Because each bucket is small, spark >>> can get it done faster than having everything in one run. >>> - I think using groupBy (userId, timestamp) might be better than >>> distinct. I guess distinct() will compare every field. >>> >>> >>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> >>> wrote: >>> >>>> And the most frequent operation I am gonna do is find the UserID who >>>> have some events, then retrieve all the events associted with the UserID. >>>> >>>> In this case, how should I partition to speed up the process? >>>> >>>> Thanks. >>>> >>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> >>>> wrote: >>>> >>>>> hey Ted, >>>>> >>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp, >>>>> MetaData. I just parse it from Json and save as Parquet, did not change >>>>> the partition. >>>>> >>>>> Annoyingly, every day's incoming Event data having duplicates among >>>>> each other. One same event could show up in Day1 and Day2 and probably >>>>> Day3. >>>>> >>>>> I only want to keep single Event table and each day it come so many >>>>> duplicates. >>>>> >>>>> Is there a way I could just insert into Parquet and if duplicate >>>>> found, just ignore? >>>>> >>>>> Thanks, >>>>> Gavin >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Is your Parquet data source partitioned by date ? >>>>>> >>>>>> Can you dedup within partitions ? >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I tried on Three day's data. The total input is only 980GB, but the >>>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle >>>>>>> read >>>>>>> step, which should be another 6.2TB shuffle read. >>>>>>> >>>>>>> I think to Dedup, the shuffling can not be avoided. Is there >>>>>>> anything I could do to stablize this process? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> >>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> I got everyday's Event table and want to merge them into a single >>>>>>>> Event table. But there so many duplicates among each day's data. >>>>>>>> >>>>>>>> I use Parquet as the data source. What I am doing now is >>>>>>>> >>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new >>>>>>>> parquet file"). >>>>>>>> >>>>>>>> Each day's Event is stored in their own Parquet file >>>>>>>> >>>>>>>> But it failed at the stage2 which keeps losing connection to one >>>>>>>> executor. I guess this is due to the memory issue. >>>>>>>> >>>>>>>> Any suggestion how I do this efficiently? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Gavin >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > > > -- > Best Regards, > Ayan Guha >