For each element in a dataset, do something with another dataset

2015-09-29 Thread Pieter Hameete
Good day everyone,

I am looking for a good way to do the following:

I have dataset A and dataset B, and for each element in dataset A I would
like to filter dataset B and obtain the size of the result. To say it short:

*for each element a in A -> B.filter( _ < a.propertyx).count*

Currently I am doing a cross of dataset A and B, making tuples so I can
then filter all the tuples where field2 < field1.propertya and then group
by field1.id and get the sizes of the groups.However this is not working
out in practice. When the datasets get larger, some Tasks hang on the CHAIN
Cross -> Filter probably because there is insufficient memory for the cross
to be completed?

Does anyone have a suggestion on how I could make this work, especially
with datasets that are larger than memory available to a separate Task?

Thank you in advance for your time :-)

Kind regards,

Pieter Hameete


Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Fabian Hueske
Hi Pieter,

cross is indeed too expensive for this task.

If dataset A fits into memory, you can do the following: Use a
RichMapPartitionFunction to process dataset B and add dataset A as a
broadcastSet. In the open method of mapPartition, you can load the
broadcasted set and sort it by a.propertyX and initialize a long[] for the
counts. For each element of dataset B, you do a binary search on the sorted
dataset A and increase all counts up to the position in the sorted list.
After all elements of dataset B have been processed, return the counts from
the long[].

If dataset A doesn't fit into memory, things become more cumbersome and we
need to play some tricky with range partitioning...

Let me know, if you have questions,
Fabian

2015-09-29 16:59 GMT+02:00 Pieter Hameete :

> Good day everyone,
>
> I am looking for a good way to do the following:
>
> I have dataset A and dataset B, and for each element in dataset A I would
> like to filter dataset B and obtain the size of the result. To say it short:
>
> *for each element a in A -> B.filter( _ < a.propertyx).count*
>
> Currently I am doing a cross of dataset A and B, making tuples so I can
> then filter all the tuples where field2 < field1.propertya and then group
> by field1.id and get the sizes of the groups.However this is not working
> out in practice. When the datasets get larger, some Tasks hang on the CHAIN
> Cross -> Filter probably because there is insufficient memory for the cross
> to be completed?
>
> Does anyone have a suggestion on how I could make this work, especially
> with datasets that are larger than memory available to a separate Task?
>
> Thank you in advance for your time :-)
>
> Kind regards,
>
> Pieter Hameete
>


Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Pieter Hameete
Hi Fabian,

thanks for your tips!

Do you have some pointers for getting started with the 'tricky range
partitioning'? I am quite keen to get this working with large datasets ;-)

Cheers,

Pieter

2015-09-30 10:24 GMT+02:00 Fabian Hueske :

> Hi Pieter,
>
> cross is indeed too expensive for this task.
>
> If dataset A fits into memory, you can do the following: Use a
> RichMapPartitionFunction to process dataset B and add dataset A as a
> broadcastSet. In the open method of mapPartition, you can load the
> broadcasted set and sort it by a.propertyX and initialize a long[] for the
> counts. For each element of dataset B, you do a binary search on the sorted
> dataset A and increase all counts up to the position in the sorted list.
> After all elements of dataset B have been processed, return the counts from
> the long[].
>
> If dataset A doesn't fit into memory, things become more cumbersome and we
> need to play some tricky with range partitioning...
>
> Let me know, if you have questions,
> Fabian
>
> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :
>
>> Good day everyone,
>>
>> I am looking for a good way to do the following:
>>
>> I have dataset A and dataset B, and for each element in dataset A I would
>> like to filter dataset B and obtain the size of the result. To say it short:
>>
>> *for each element a in A -> B.filter( _ < a.propertyx).count*
>>
>> Currently I am doing a cross of dataset A and B, making tuples so I can
>> then filter all the tuples where field2 < field1.propertya and then group
>> by field1.id and get the sizes of the groups.However this is not working
>> out in practice. When the datasets get larger, some Tasks hang on the CHAIN
>> Cross -> Filter probably because there is insufficient memory for the cross
>> to be completed?
>>
>> Does anyone have a suggestion on how I could make this work, especially
>> with datasets that are larger than memory available to a separate Task?
>>
>> Thank you in advance for your time :-)
>>
>> Kind regards,
>>
>> Pieter Hameete
>>
>
>


Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Fabian Hueske
The idea is to partition both datasets by range.
Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
[1,2,3] and p2: [4,5,6].
Each partition is given to a different instance of a MapPartition operator
(this is a bit tricky, because you cannot use broadcastSet. You could load
the corresponding partition it in the open() function from HDFS for
example).

DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
partition 1, everything > 3 goes to p2. You can partition a dataset by
range using the partitionCustom() function. The partitioned dataset is
given to the mapPartition operator that loaded a partition of dataset A in
each task instance.
You do the counting just like before (sorting the partition of dataset A,
binary sort, long[]), but add an additional count for the complete
partition (basically count all elements that arrive in the task instance).

If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
would be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
Now you need to compute the final count by adding the "all" counts of the
lower partitions to the counts of the "higher" partitions, i.e., add all:5
of p1 to all counts for p2.

This approach requires to know the value range and distribution of the
values which makes it a bit difficult. I guess you'll get the best
performance, if you partition in a way, that you have about equally sized
partitions of dataset B with the constraint that the corresponding
partitions of A fit into memory.

As I said, its a bit cumbersome. I hope you could follow my explanation.
Please ask if something is not clear ;-)

2015-09-30 10:46 GMT+02:00 Pieter Hameete :

> Hi Fabian,
>
> thanks for your tips!
>
> Do you have some pointers for getting started with the 'tricky range
> partitioning'? I am quite keen to get this working with large datasets ;-)
>
> Cheers,
>
> Pieter
>
> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>
>> Hi Pieter,
>>
>> cross is indeed too expensive for this task.
>>
>> If dataset A fits into memory, you can do the following: Use a
>> RichMapPartitionFunction to process dataset B and add dataset A as a
>> broadcastSet. In the open method of mapPartition, you can load the
>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>> counts. For each element of dataset B, you do a binary search on the sorted
>> dataset A and increase all counts up to the position in the sorted list.
>> After all elements of dataset B have been processed, return the counts from
>> the long[].
>>
>> If dataset A doesn't fit into memory, things become more cumbersome and
>> we need to play some tricky with range partitioning...
>>
>> Let me know, if you have questions,
>> Fabian
>>
>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :
>>
>>> Good day everyone,
>>>
>>> I am looking for a good way to do the following:
>>>
>>> I have dataset A and dataset B, and for each element in dataset A I
>>> would like to filter dataset B and obtain the size of the result. To say it
>>> short:
>>>
>>> *for each element a in A -> B.filter( _ < a.propertyx).count*
>>>
>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>> then filter all the tuples where field2 < field1.propertya and then group
>>> by field1.id and get the sizes of the groups.However this is not
>>> working out in practice. When the datasets get larger, some Tasks hang on
>>> the CHAIN Cross -> Filter probably because there is insufficient memory for
>>> the cross to be completed?
>>>
>>> Does anyone have a suggestion on how I could make this work, especially
>>> with datasets that are larger than memory available to a separate Task?
>>>
>>> Thank you in advance for your time :-)
>>>
>>> Kind regards,
>>>
>>> Pieter Hameete
>>>
>>
>>
>


Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Gábor Gévay
Hello,

Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.

Best,
Gabor



2015-09-30 11:20 GMT+02:00 Fabian Hueske :
> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
>
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
>
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
>
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
>
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
>
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
>>
>> Hi Fabian,
>>
>> thanks for your tips!
>>
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>>
>> Cheers,
>>
>> Pieter
>>
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>>>
>>> Hi Pieter,
>>>
>>> cross is indeed too expensive for this task.
>>>
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>>
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>>
>>> Let me know, if you have questions,
>>> Fabian
>>>
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :

 Good day everyone,

 I am looking for a good way to do the following:

 I have dataset A and dataset B, and for each element in dataset A I
 would like to filter dataset B and obtain the size of the result. To say it
 short:

 for each element a in A -> B.filter( _ < a.propertyx).count

 Currently I am doing a cross of dataset A and B, making tuples so I can
 then filter all the tuples where field2 < field1.propertya and then group 
 by
 field1.id and get the sizes of the groups.However this is not working out 
 in
 practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
 -> Filter probably because there is insufficient memory for the cross to be
 completed?

 Does anyone have a suggestion on how I could make this work, especially
 with datasets that are larger than memory available to a separate Task?

 Thank you in advance for your time :-)

 Kind regards,

 Pieter Hameete
>>>
>>>
>>
>


Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Pieter Hameete
Hi Gabor, Fabian,

thank you for your suggestions. I am intending to scale up so that I'm sure
that both A and B won't fit in memory. I'll see if I can come up with a
nice way to partition the datasets but if that will take too much time I'll
just have to accept that it wont work on large datasets. I'll let you know
if I managed to work something out, but I wont work on it until the weekend
:-)

Cheers again,

Pieter

2015-09-30 12:28 GMT+02:00 Gábor Gévay :

> Hello,
>
> Alternatively, if dataset B fits in memory, but dataset A doesn't,
> then you can do it with broadcasting B to a RichMapPartitionFunction
> on A:
> In the open method of mapPartition, you sort B. Then, for each element
> of A, you do a binary search in B, and look at the index found by the
> binary search, which will be the count that you are looking for.
>
> Best,
> Gabor
>
>
>
> 2015-09-30 11:20 GMT+02:00 Fabian Hueske :
> > The idea is to partition both datasets by range.
> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> > [1,2,3] and p2: [4,5,6].
> > Each partition is given to a different instance of a MapPartition
> operator
> > (this is a bit tricky, because you cannot use broadcastSet. You could
> load
> > the corresponding partition it in the open() function from HDFS for
> > example).
> >
> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> > partition 1, everything > 3 goes to p2. You can partition a dataset by
> range
> > using the partitionCustom() function. The partitioned dataset is given to
> > the mapPartition operator that loaded a partition of dataset A in each
> task
> > instance.
> > You do the counting just like before (sorting the partition of dataset A,
> > binary sort, long[]), but add an additional count for the complete
> partition
> > (basically count all elements that arrive in the task instance).
> >
> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
> would
> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> > Now you need to compute the final count by adding the "all" counts of the
> > lower partitions to the counts of the "higher" partitions, i.e., add
> all:5
> > of p1 to all counts for p2.
> >
> > This approach requires to know the value range and distribution of the
> > values which makes it a bit difficult. I guess you'll get the best
> > performance, if you partition in a way, that you have about equally sized
> > partitions of dataset B with the constraint that the corresponding
> > partitions of A fit into memory.
> >
> > As I said, its a bit cumbersome. I hope you could follow my explanation.
> > Please ask if something is not clear ;-)
> >
> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
> >>
> >> Hi Fabian,
> >>
> >> thanks for your tips!
> >>
> >> Do you have some pointers for getting started with the 'tricky range
> >> partitioning'? I am quite keen to get this working with large datasets
> ;-)
> >>
> >> Cheers,
> >>
> >> Pieter
> >>
> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
> >>>
> >>> Hi Pieter,
> >>>
> >>> cross is indeed too expensive for this task.
> >>>
> >>> If dataset A fits into memory, you can do the following: Use a
> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
> >>> broadcastSet. In the open method of mapPartition, you can load the
> >>> broadcasted set and sort it by a.propertyX and initialize a long[] for
> the
> >>> counts. For each element of dataset B, you do a binary search on the
> sorted
> >>> dataset A and increase all counts up to the position in the sorted
> list.
> >>> After all elements of dataset B have been processed, return the counts
> from
> >>> the long[].
> >>>
> >>> If dataset A doesn't fit into memory, things become more cumbersome and
> >>> we need to play some tricky with range partitioning...
> >>>
> >>> Let me know, if you have questions,
> >>> Fabian
> >>>
> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :
> 
>  Good day everyone,
> 
>  I am looking for a good way to do the following:
> 
>  I have dataset A and dataset B, and for each element in dataset A I
>  would like to filter dataset B and obtain the size of the result. To
> say it
>  short:
> 
>  for each element a in A -> B.filter( _ < a.propertyx).count
> 
>  Currently I am doing a cross of dataset A and B, making tuples so I
> can
>  then filter all the tuples where field2 < field1.propertya and then
> group by
>  field1.id and get the sizes of the groups.However this is not
> working out in
>  practice. When the datasets get larger, some Tasks hang on the CHAIN
> Cross
>  -> Filter probably because there is insufficient memory for the cross
> to be
>  completed?
> 
>  Does anyone have a suggestion on how I could make this work,
> especially
>  with datasets that are larger than memory available to a separate
> Task?
> 
>  Thank you in advance for your time :-)
> 
>  Kind regards,
> >>>

Re: For each element in a dataset, do something with another dataset

2015-10-05 Thread Pieter Hameete
Hi Fabian,

I have a question regarding the first approach. Is there a benefit gained
from choosing a RichMapPartitionFunction over a RichMapFunction in this
case? I assume that each broadcasted dataset is sent only once to each task
manager?

If I would broadcast dataset B, then I could for each element a in A count
the number of elements in B that are smaller than a and output a tuple in a
map operation. This would also save me a step in aggregating the results?

Kind regards,

Pieter

2015-09-30 12:44 GMT+02:00 Pieter Hameete :

> Hi Gabor, Fabian,
>
> thank you for your suggestions. I am intending to scale up so that I'm
> sure that both A and B won't fit in memory. I'll see if I can come up with
> a nice way to partition the datasets but if that will take too much time
> I'll just have to accept that it wont work on large datasets. I'll let you
> know if I managed to work something out, but I wont work on it until the
> weekend :-)
>
> Cheers again,
>
> Pieter
>
> 2015-09-30 12:28 GMT+02:00 Gábor Gévay :
>
>> Hello,
>>
>> Alternatively, if dataset B fits in memory, but dataset A doesn't,
>> then you can do it with broadcasting B to a RichMapPartitionFunction
>> on A:
>> In the open method of mapPartition, you sort B. Then, for each element
>> of A, you do a binary search in B, and look at the index found by the
>> binary search, which will be the count that you are looking for.
>>
>> Best,
>> Gabor
>>
>>
>>
>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske :
>> > The idea is to partition both datasets by range.
>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
>> > [1,2,3] and p2: [4,5,6].
>> > Each partition is given to a different instance of a MapPartition
>> operator
>> > (this is a bit tricky, because you cannot use broadcastSet. You could
>> load
>> > the corresponding partition it in the open() function from HDFS for
>> > example).
>> >
>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
>> > partition 1, everything > 3 goes to p2. You can partition a dataset by
>> range
>> > using the partitionCustom() function. The partitioned dataset is given
>> to
>> > the mapPartition operator that loaded a partition of dataset A in each
>> task
>> > instance.
>> > You do the counting just like before (sorting the partition of dataset
>> A,
>> > binary sort, long[]), but add an additional count for the complete
>> partition
>> > (basically count all elements that arrive in the task instance).
>> >
>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
>> would
>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
>> > Now you need to compute the final count by adding the "all" counts of
>> the
>> > lower partitions to the counts of the "higher" partitions, i.e., add
>> all:5
>> > of p1 to all counts for p2.
>> >
>> > This approach requires to know the value range and distribution of the
>> > values which makes it a bit difficult. I guess you'll get the best
>> > performance, if you partition in a way, that you have about equally
>> sized
>> > partitions of dataset B with the constraint that the corresponding
>> > partitions of A fit into memory.
>> >
>> > As I said, its a bit cumbersome. I hope you could follow my explanation.
>> > Please ask if something is not clear ;-)
>> >
>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
>> >>
>> >> Hi Fabian,
>> >>
>> >> thanks for your tips!
>> >>
>> >> Do you have some pointers for getting started with the 'tricky range
>> >> partitioning'? I am quite keen to get this working with large datasets
>> ;-)
>> >>
>> >> Cheers,
>> >>
>> >> Pieter
>> >>
>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>> >>>
>> >>> Hi Pieter,
>> >>>
>> >>> cross is indeed too expensive for this task.
>> >>>
>> >>> If dataset A fits into memory, you can do the following: Use a
>> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
>> >>> broadcastSet. In the open method of mapPartition, you can load the
>> >>> broadcasted set and sort it by a.propertyX and initialize a long[]
>> for the
>> >>> counts. For each element of dataset B, you do a binary search on the
>> sorted
>> >>> dataset A and increase all counts up to the position in the sorted
>> list.
>> >>> After all elements of dataset B have been processed, return the
>> counts from
>> >>> the long[].
>> >>>
>> >>> If dataset A doesn't fit into memory, things become more cumbersome
>> and
>> >>> we need to play some tricky with range partitioning...
>> >>>
>> >>> Let me know, if you have questions,
>> >>> Fabian
>> >>>
>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :
>> 
>>  Good day everyone,
>> 
>>  I am looking for a good way to do the following:
>> 
>>  I have dataset A and dataset B, and for each element in dataset A I
>>  would like to filter dataset B and obtain the size of the result. To
>> say it
>>  short:
>> 
>>  for each element a in A -> B.filter( _ < a.propertyx).count
>> 
>>  Cur

Re: For each element in a dataset, do something with another dataset

2015-10-05 Thread Fabian Hueske
Hi Pieter,

a FlatMapFunction can only return values when the map() method is called.
However, in your use case, you would like to return values *after* the
function was called the last time. This is not possible with a
FlatMapFunction, because you cannot identify the last map() call.
The MapPartitionFunction is called only once with an iterator over the
whole partition. Hence you can return values after the iterator was fully
consumed.

The broadcast set is sent only once in both cases.

If it is possible to broadcast dataset B, you can also use a MapFunction
and don't need to store the count values.

Best, Fabian

2015-10-05 11:53 GMT+02:00 Pieter Hameete :

> Hi Fabian,
>
> I have a question regarding the first approach. Is there a benefit gained
> from choosing a RichMapPartitionFunction over a RichMapFunction in this
> case? I assume that each broadcasted dataset is sent only once to each task
> manager?
>
> If I would broadcast dataset B, then I could for each element a in A count
> the number of elements in B that are smaller than a and output a tuple in a
> map operation. This would also save me a step in aggregating the results?
>
> Kind regards,
>
> Pieter
>
> 2015-09-30 12:44 GMT+02:00 Pieter Hameete :
>
>> Hi Gabor, Fabian,
>>
>> thank you for your suggestions. I am intending to scale up so that I'm
>> sure that both A and B won't fit in memory. I'll see if I can come up with
>> a nice way to partition the datasets but if that will take too much time
>> I'll just have to accept that it wont work on large datasets. I'll let you
>> know if I managed to work something out, but I wont work on it until the
>> weekend :-)
>>
>> Cheers again,
>>
>> Pieter
>>
>> 2015-09-30 12:28 GMT+02:00 Gábor Gévay :
>>
>>> Hello,
>>>
>>> Alternatively, if dataset B fits in memory, but dataset A doesn't,
>>> then you can do it with broadcasting B to a RichMapPartitionFunction
>>> on A:
>>> In the open method of mapPartition, you sort B. Then, for each element
>>> of A, you do a binary search in B, and look at the index found by the
>>> binary search, which will be the count that you are looking for.
>>>
>>> Best,
>>> Gabor
>>>
>>>
>>>
>>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske :
>>> > The idea is to partition both datasets by range.
>>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
>>> > [1,2,3] and p2: [4,5,6].
>>> > Each partition is given to a different instance of a MapPartition
>>> operator
>>> > (this is a bit tricky, because you cannot use broadcastSet. You could
>>> load
>>> > the corresponding partition it in the open() function from HDFS for
>>> > example).
>>> >
>>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
>>> > partition 1, everything > 3 goes to p2. You can partition a dataset by
>>> range
>>> > using the partitionCustom() function. The partitioned dataset is given
>>> to
>>> > the mapPartition operator that loaded a partition of dataset A in each
>>> task
>>> > instance.
>>> > You do the counting just like before (sorting the partition of dataset
>>> A,
>>> > binary sort, long[]), but add an additional count for the complete
>>> partition
>>> > (basically count all elements that arrive in the task instance).
>>> >
>>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
>>> would
>>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
>>> > Now you need to compute the final count by adding the "all" counts of
>>> the
>>> > lower partitions to the counts of the "higher" partitions, i.e., add
>>> all:5
>>> > of p1 to all counts for p2.
>>> >
>>> > This approach requires to know the value range and distribution of the
>>> > values which makes it a bit difficult. I guess you'll get the best
>>> > performance, if you partition in a way, that you have about equally
>>> sized
>>> > partitions of dataset B with the constraint that the corresponding
>>> > partitions of A fit into memory.
>>> >
>>> > As I said, its a bit cumbersome. I hope you could follow my
>>> explanation.
>>> > Please ask if something is not clear ;-)
>>> >
>>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
>>> >>
>>> >> Hi Fabian,
>>> >>
>>> >> thanks for your tips!
>>> >>
>>> >> Do you have some pointers for getting started with the 'tricky range
>>> >> partitioning'? I am quite keen to get this working with large
>>> datasets ;-)
>>> >>
>>> >> Cheers,
>>> >>
>>> >> Pieter
>>> >>
>>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>>> >>>
>>> >>> Hi Pieter,
>>> >>>
>>> >>> cross is indeed too expensive for this task.
>>> >>>
>>> >>> If dataset A fits into memory, you can do the following: Use a
>>> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> >>> broadcastSet. In the open method of mapPartition, you can load the
>>> >>> broadcasted set and sort it by a.propertyX and initialize a long[]
>>> for the
>>> >>> counts. For each element of dataset B, you do a binary search on the
>>> sorted
>>> >>> dataset A and increase all counts