One option you may want to explore is writing event table in an noSQL db
such as Hbase. One inherent problem in your approach is you always need to
load either full data set or a defined number of partitions to see if the
event has already come (and no gurantee it is full proof, but lead to
unnecessary loading in most cases).

On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Hey,
> Thank you for the answer. I checked the setting you mentioend they are all
> correct.  I noticed that in the job, there are always only 200 reducers for
> shuffle read, I believe it is setting in the sql shuffle parallism.
>
> In the doc, it mentions:
>
>    - Automatically determine the number of reducers for joins and
>    groupbys: Currently in Spark SQL, you need to control the degree of
>    parallelism post-shuffle using “SET
>    spark.sql.shuffle.partitions=[num_tasks];”.
>
>
> What would be the ideal number for this setting? Is it based on the
> hardware of cluster?
>
>
> Thanks,
>
> Gavin
>
> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote:
>
>>
>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>    - What spark version did you use? It seems at least 1.4. If you use
>>    spark-sql and tungsten, you might have better performance. but spark 1.5.2
>>    gave me a wrong result when the data was about 300~400GB, just for a 
>> simple
>>    group-by and aggregate.
>>    - Did you use kyro serialization?
>>    - you should have spark.shuffle.compress=true, verify it.
>>    - How many tasks did you use? spark.default.parallelism=?
>>    - What about this:
>>       - Read the data day by day
>>       - compute a bucket id from timestamp, e.g., the date and hour
>>       - Write into different buckets (you probably need a special writer
>>       to write data efficiently without shuffling the data).
>>       - distinct for each bucket. Because each bucket is small, spark
>>       can get it done faster than having everything in one run.
>>       - I think using groupBy (userId, timestamp) might be better than
>>       distinct. I guess distinct() will compare every field.
>>
>>
>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> And the most frequent operation I am gonna do is find the UserID who
>>> have some events, then retrieve all the events associted with the UserID.
>>>
>>> In this case, how should I partition to speed up the process?
>>>
>>> Thanks.
>>>
>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> hey Ted,
>>>>
>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>> the partition.
>>>>
>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>> Day3.
>>>>
>>>> I only want to keep single Event table and each day it come so many
>>>> duplicates.
>>>>
>>>> Is there a way I could just insert into Parquet and if duplicate found,
>>>> just ignore?
>>>>
>>>> Thanks,
>>>> Gavin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Is your Parquet data source partitioned by date ?
>>>>>
>>>>> Can you dedup within partitions ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle 
>>>>>> read
>>>>>> step, which should be another 6.2TB shuffle read.
>>>>>>
>>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>>> I could do to stablize this process?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>
>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>
>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>> parquet file").
>>>>>>>
>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>
>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>
>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Gavin
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to