Re: Spark aggregateByKey Issues

2015-09-15 Thread biyan900116
Hi Alexis:

Of course, it’s very useful to me, specially about the operations after sort 
operation is done.

And, i still have one question:
How to set the decent number of partition,  if it need not to be equal to the 
number of keys ?

> 在 2015年9月15日,下午3:41,Alexis Gillain  写道:
> 
> Sorry I made a typo error in my previous message, you can't 
> sortByKey(youkey,date) and have all records of your keys in the same 
> partition.
>  
> So you can sortByKey(yourkey)  with a decent number of partition (doesnt have 
> to be the number of keys). After that you have records of a key grouped in a 
> partition but not sort by date.
> 
> Then you use mapPartitions to copy the partition in a List and you can sort 
> by (youkey, date) and use this list to compute whatever you want. The main 
> issue is that a partition must fit in memory.
> 
> Hope this help.
> 
> 2015-09-15 13:50 GMT+08:00  >:
> Hi Alexis:
> 
> Thank you for your replying.
> 
> My case is that each operation to one record need to depend on one value that 
> will be set by the operating to the last record. 
> 
> So your advise is that i can use “sortByKey”. “sortByKey” will put all 
> records with the same Key in one partition. Need I take the “numPartitions” 
> parameter ? Or even if i don’t , it still do that .
> 
> If it works, add “aggregate” to deal with my case, i think the comOp function 
> in parameter list of aggregate API won’t be executed.. Is my understanding 
> wrong ? 
>   
> 
>> 在 2015年9月15日,下午12:47,Alexis Gillain > > 写道:
>> 
>> I'm not sure about what you want to do.
>> 
>> You should try to sort the RDD by (yourKey, date), it ensures that all the 
>> keys are in the same partition.
>> 
>> You problem after that is you want to aggregate only on yourKey and if you 
>> change the Key of the sorted RDD you loose partitionning.
>> 
>> Depending of the size of the result you can use an aggregate bulding a map 
>> of results by youKey or use MapPartition to output a rdd (in this case set 
>> the number of partition high enough to allow the partition to fit in memory).
>> 
>> see 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>  
>> 
>> 
>> 2015-09-15 11:25 GMT+08:00 毕岩 > >:
>> Hi:
>> 
>> 
>> 
>> There is such one case about using reduce operation like that:
>> 
>> 
>> 
>> I Need to reduce large data made up of billions of records with a Key-Value 
>> pair.
>> 
>> For the following:
>> 
>> First,group by Key, and the records with the same Key need to be in 
>> order of one field called “date” in Value
>> 
>> Second, in records with the same Key, every operation to one recored 
>> need to depend on the result of dealing with the last one, and the first one 
>> is original state..
>> 
>> 
>> 
>> Some attempts:
>> 
>> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so 
>> executors heartbeat time out and throw errors “Lost executor”, or the 
>> application is hung…
>> 
>> 
>> 
>> 2. AggregateByKey:
>> 
>> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>> 
>> combOp: (U, U) => U): RDD[(K, U)]
>> 
>> About aggregateByKey, is all the records with the same Key In the same 
>> partition ? Is the zeroValue applied to the first one in all records with 
>> the same Key, or in each partition ? If it is the former, comOp Function do 
>> nothing! 
>> 
>> I tried to take the second “numPartitions” parameter, pass the number of key 
>> to it. But, the number of key is so large to all the tasks be killed.
>> 
>> 
>> 
>> What should I do with this case ? 
>> 
>> I'm asking for advises online...
>> 
>> Thank you.
>> 
>> 
>> 
>> 
>> -- 
>> Alexis GILLAIN
> 
> 
> 
> 
> -- 
> Alexis GILLAIN



Re: Spark aggregateByKey Issues

2015-09-15 Thread Alexis Gillain
That's the tricky part.

If the volume of data is always the same you can test and learn one.
If the volume of data can vary you can use the number of records in your
file divide by the number of records you think can fit in memory.
Anyway the distribution of your records can still impact the result.

About aggregate, indeed the comOp won't be executed if you sort the records.

2015-09-15 16:34 GMT+08:00 :

> Hi Alexis:
>
> Of course, it’s very useful to me, specially about the operations after
> sort operation is done.
>
> And, i still have one question:
> How to set the decent number of partition,  if it need not to be equal to
> the number of keys ?
>
> 在 2015年9月15日,下午3:41,Alexis Gillain  写道:
>
> Sorry I made a typo error in my previous message, you can't
> sortByKey(youkey,date) and have all records of your keys in the same
> partition.
>
> So you can sortByKey(yourkey)  with a decent number of partition (doesnt
> have to be the number of keys). After that you have records of a key
> grouped in a partition but not sort by date.
>
> Then you use mapPartitions to copy the partition in a List and you can
> sort by (youkey, date) and use this list to compute whatever you want. The
> main issue is that a partition must fit in memory.
>
> Hope this help.
>
> 2015-09-15 13:50 GMT+08:00 :
>
>> Hi Alexis:
>>
>> Thank you for your replying.
>>
>> My case is that each operation to one record need to depend on one value
>> that will be set by the operating to the last record.
>>
>> So your advise is that i can use “sortByKey”. “sortByKey” will put all
>> records with the same Key in one partition. Need I take the “numPartitions”
>> parameter ? Or even if i don’t , it still do that .
>>
>> If it works, add “aggregate” to deal with my case, i think the comOp
>> function in parameter list of aggregate API won’t be executed.. Is my
>> understanding wrong ?
>>
>>
>> 在 2015年9月15日,下午12:47,Alexis Gillain  写道:
>>
>> I'm not sure about what you want to do.
>>
>> You should try to sort the RDD by (yourKey, date), it ensures that all
>> the keys are in the same partition.
>>
>> You problem after that is you want to aggregate only on yourKey and if
>> you change the Key of the sorted RDD you loose partitionning.
>>
>> Depending of the size of the result you can use an aggregate bulding a
>> map of results by youKey or use MapPartition to output a rdd (in this case
>> set the number of partition high enough to allow the partition to fit in
>> memory).
>>
>> see
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>
>> 2015-09-15 11:25 GMT+08:00 毕岩 :
>>
>>> Hi:
>>>
>>>
>>> There is such one case about using reduce operation like that:
>>>
>>>
>>> I Need to reduce large data made up of billions of records with a
>>> Key-Value pair.
>>>
>>> For the following:
>>>
>>> *First,group by Key, and the records with the same Key need to be
>>> in order of one field called “date” in Value*
>>>
>>> *Second, in records with the same Key, every operation to one
>>> recored need to depend on the result of dealing with the last one, and the
>>> first one is original state..*
>>>
>>>
>>> Some attempts:
>>>
>>> 1. groupByKey + map :  because of the bad performance, CPU is to 100%,
>>> so executors heartbeat time out and throw errors “Lost executor”, or the
>>> application is hung…
>>>
>>>
>>> 2. AggregateByKey:
>>>
>>> * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,*
>>>
>>> *combOp: (U, U) => U): RDD[(K, U)]*
>>>
>>> About aggregateByKey, is all the records with the same Key In the same
>>> partition ? Is the zeroValue applied to the first one in all records
>>> with the same Key, or in each partition ? If it is the former, comOp
>>> Function do nothing!
>>>
>>> I tried to take the second “numPartitions” parameter, pass the number of
>>> key to it. But, the number of key is so large to all the tasks be killed.
>>>
>>>
>>> What should I do with this case ?
>>>
>>> I'm asking for advises online...
>>>
>>> Thank you.
>>>
>>
>>
>>
>> --
>> Alexis GILLAIN
>>
>>
>>
>
>
> --
> Alexis GILLAIN
>
>
>


-- 
Alexis GILLAIN


Re: Spark aggregateByKey Issues

2015-09-14 Thread Alexis Gillain
I'm not sure about what you want to do.

You should try to sort the RDD by (yourKey, date), it ensures that all the
keys are in the same partition.

You problem after that is you want to aggregate only on yourKey and if you
change the Key of the sorted RDD you loose partitionning.

Depending of the size of the result you can use an aggregate bulding a map
of results by youKey or use MapPartition to output a rdd (in this case set
the number of partition high enough to allow the partition to fit in
memory).

see
http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html

2015-09-15 11:25 GMT+08:00 毕岩 :

> Hi:
>
>
> There is such one case about using reduce operation like that:
>
>
> I Need to reduce large data made up of billions of records with a
> Key-Value pair.
>
> For the following:
>
> *First,group by Key, and the records with the same Key need to be in
> order of one field called “date” in Value*
>
> *Second, in records with the same Key, every operation to one recored
> need to depend on the result of dealing with the last one, and the first
> one is original state..*
>
>
> Some attempts:
>
> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so
> executors heartbeat time out and throw errors “Lost executor”, or the
> application is hung…
>
>
> 2. AggregateByKey:
>
> * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,*
>
> *combOp: (U, U) => U): RDD[(K, U)]*
>
> About aggregateByKey, is all the records with the same Key In the same
> partition ? Is the zeroValue applied to the first one in all records with
> the same Key, or in each partition ? If it is the former, comOp Function
> do nothing!
>
> I tried to take the second “numPartitions” parameter, pass the number of
> key to it. But, the number of key is so large to all the tasks be killed.
>
>
> What should I do with this case ?
>
> I'm asking for advises online...
>
> Thank you.
>



-- 
Alexis GILLAIN


Spark aggregateByKey Issues

2015-09-14 Thread 毕岩
Hi:


There is such one case about using reduce operation like that:


I Need to reduce large data made up of billions of records with a Key-Value
pair.

For the following:

*First,group by Key, and the records with the same Key need to be in
order of one field called “date” in Value*

*Second, in records with the same Key, every operation to one recored
need to depend on the result of dealing with the last one, and the first
one is original state..*


Some attempts:

1. groupByKey + map :  because of the bad performance, CPU is to 100%, so
executors heartbeat time out and throw errors “Lost executor”, or the
application is hung…


2. AggregateByKey:

* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,*

*combOp: (U, U) => U): RDD[(K, U)]*

About aggregateByKey, is all the records with the same Key In the same
partition ? Is the zeroValue applied to the first one in all records with
the same Key, or in each partition ? If it is the former, comOp Function do
nothing!

I tried to take the second “numPartitions” parameter, pass the number of
key to it. But, the number of key is so large to all the tasks be killed.


What should I do with this case ?

I'm asking for advises online...

Thank you.


Re: Spark aggregateByKey Issues

2015-09-14 Thread biyan900116
Hi Alexis:

Thank you for your replying.

My case is that each operation to one record need to depend on one value that 
will be set by the operating to the last record. 

So your advise is that i can use “sortByKey”. “sortByKey” will put all records 
with the same Key in one partition. Need I take the “numPartitions” parameter ? 
Or even if i don’t , it still do that .

If it works, add “aggregate” to deal with my case, i think the comOp function 
in parameter list of aggregate API won’t be executed.. Is my understanding 
wrong ? 
  

> 在 2015年9月15日,下午12:47,Alexis Gillain  写道:
> 
> I'm not sure about what you want to do.
> 
> You should try to sort the RDD by (yourKey, date), it ensures that all the 
> keys are in the same partition.
> 
> You problem after that is you want to aggregate only on yourKey and if you 
> change the Key of the sorted RDD you loose partitionning.
> 
> Depending of the size of the result you can use an aggregate bulding a map of 
> results by youKey or use MapPartition to output a rdd (in this case set the 
> number of partition high enough to allow the partition to fit in memory).
> 
> see 
> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>  
> 
> 
> 2015-09-15 11:25 GMT+08:00 毕岩  >:
> Hi:
> 
> 
> 
> There is such one case about using reduce operation like that:
> 
> 
> 
> I Need to reduce large data made up of billions of records with a Key-Value 
> pair.
> 
> For the following:
> 
> First,group by Key, and the records with the same Key need to be in order 
> of one field called “date” in Value
> 
> Second, in records with the same Key, every operation to one recored need 
> to depend on the result of dealing with the last one, and the first one is 
> original state..
> 
> 
> 
> Some attempts:
> 
> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so 
> executors heartbeat time out and throw errors “Lost executor”, or the 
> application is hung…
> 
> 
> 
> 2. AggregateByKey:
> 
> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
> 
> combOp: (U, U) => U): RDD[(K, U)]
> 
> About aggregateByKey, is all the records with the same Key In the same 
> partition ? Is the zeroValue applied to the first one in all records with the 
> same Key, or in each partition ? If it is the former, comOp Function do 
> nothing! 
> 
> I tried to take the second “numPartitions” parameter, pass the number of key 
> to it. But, the number of key is so large to all the tasks be killed.
> 
> 
> 
> What should I do with this case ? 
> 
> I'm asking for advises online...
> 
> Thank you.
> 
> 
> 
> 
> -- 
> Alexis GILLAIN