Hi Gabor, Fabian,

thank you for your suggestions. I am intending to scale up so that I'm sure
that both A and B won't fit in memory. I'll see if I can come up with a
nice way to partition the datasets but if that will take too much time I'll
just have to accept that it wont work on large datasets. I'll let you know
if I managed to work something out, but I wont work on it until the weekend
:-)

Cheers again,

Pieter

2015-09-30 12:28 GMT+02:00 Gábor Gévay <gga...@gmail.com>:

> Hello,
>
> Alternatively, if dataset B fits in memory, but dataset A doesn't,
> then you can do it with broadcasting B to a RichMapPartitionFunction
> on A:
> In the open method of mapPartition, you sort B. Then, for each element
> of A, you do a binary search in B, and look at the index found by the
> binary search, which will be the count that you are looking for.
>
> Best,
> Gabor
>
>
>
> 2015-09-30 11:20 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> > The idea is to partition both datasets by range.
> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> > [1,2,3] and p2: [4,5,6].
> > Each partition is given to a different instance of a MapPartition
> operator
> > (this is a bit tricky, because you cannot use broadcastSet. You could
> load
> > the corresponding partition it in the open() function from HDFS for
> > example).
> >
> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> > partition 1, everything > 3 goes to p2. You can partition a dataset by
> range
> > using the partitionCustom() function. The partitioned dataset is given to
> > the mapPartition operator that loaded a partition of dataset A in each
> task
> > instance.
> > You do the counting just like before (sorting the partition of dataset A,
> > binary sort, long[]), but add an additional count for the complete
> partition
> > (basically count all elements that arrive in the task instance).
> >
> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
> would
> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> > Now you need to compute the final count by adding the "all" counts of the
> > lower partitions to the counts of the "higher" partitions, i.e., add
> all:5
> > of p1 to all counts for p2.
> >
> > This approach requires to know the value range and distribution of the
> > values which makes it a bit difficult. I guess you'll get the best
> > performance, if you partition in a way, that you have about equally sized
> > partitions of dataset B with the constraint that the corresponding
> > partitions of A fit into memory.
> >
> > As I said, its a bit cumbersome. I hope you could follow my explanation.
> > Please ask if something is not clear ;-)
> >
> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
> >>
> >> Hi Fabian,
> >>
> >> thanks for your tips!
> >>
> >> Do you have some pointers for getting started with the 'tricky range
> >> partitioning'? I am quite keen to get this working with large datasets
> ;-)
> >>
> >> Cheers,
> >>
> >> Pieter
> >>
> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> >>>
> >>> Hi Pieter,
> >>>
> >>> cross is indeed too expensive for this task.
> >>>
> >>> If dataset A fits into memory, you can do the following: Use a
> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
> >>> broadcastSet. In the open method of mapPartition, you can load the
> >>> broadcasted set and sort it by a.propertyX and initialize a long[] for
> the
> >>> counts. For each element of dataset B, you do a binary search on the
> sorted
> >>> dataset A and increase all counts up to the position in the sorted
> list.
> >>> After all elements of dataset B have been processed, return the counts
> from
> >>> the long[].
> >>>
> >>> If dataset A doesn't fit into memory, things become more cumbersome and
> >>> we need to play some tricky with range partitioning...
> >>>
> >>> Let me know, if you have questions,
> >>> Fabian
> >>>
> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
> >>>>
> >>>> Good day everyone,
> >>>>
> >>>> I am looking for a good way to do the following:
> >>>>
> >>>> I have dataset A and dataset B, and for each element in dataset A I
> >>>> would like to filter dataset B and obtain the size of the result. To
> say it
> >>>> short:
> >>>>
> >>>> for each element a in A -> B.filter( _ < a.propertyx).count
> >>>>
> >>>> Currently I am doing a cross of dataset A and B, making tuples so I
> can
> >>>> then filter all the tuples where field2 < field1.propertya and then
> group by
> >>>> field1.id and get the sizes of the groups.However this is not
> working out in
> >>>> practice. When the datasets get larger, some Tasks hang on the CHAIN
> Cross
> >>>> -> Filter probably because there is insufficient memory for the cross
> to be
> >>>> completed?
> >>>>
> >>>> Does anyone have a suggestion on how I could make this work,
> especially
> >>>> with datasets that are larger than memory available to a separate
> Task?
> >>>>
> >>>> Thank you in advance for your time :-)
> >>>>
> >>>> Kind regards,
> >>>>
> >>>> Pieter Hameete
> >>>
> >>>
> >>
> >
>

Reply via email to