Hello,
I am trying to do a groupBy on 5 attributes to get results in a form like a
pivot table in microsoft excel. The keys are the attribute tuples and
values are double arrays(maybe very large). Based on the code below, I am
getting back correct results, but would like to optimize it further(I
played around with numPartitions).
The two issues I see are -
1. flatMap is needed to expand the key tuples, but this also duplicates the
values, and as the values are large this increases the shuffle input size
for reduceByKey - is there a way to avoid the duplication?
2. reduceByKey is adding two arrays element wise, and creates a new array
for every addition, is there a way to reduce by not creating a new array
everytime(Similar to what accumulators do)?
I am pasting a sample code, query plan and output below.
Thanks.
val attributeToFloatArrayRDD = sc.parallelize(Array(
("A-1", "B-2", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
, ("A-2", "B-1", "C-1", "D-2", "E-1") -> (5.0 to 1005.0 by 0.25).toArray
, ("A-1", "B-1", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
, ("A-3", "B-3", "C-1", "D-1", "E-2") -> (0.0 to 1000.0 by 0.25).toArray
, ("A-1", "B-1", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
, ("A-4", "B-3", "C-1", "D-1", "E-1") -> (8.0 to 1008.0 by 0.25).toArray
, ("A-1", "B-1", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
))
val groupToVaRRDD = attributeToFloatArrayRDD
.flatMap(x => x._1 match {
case (t1, t2, t3, t4, t5) => Array((t1+"_top"), (t1, t2), (t1,
t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y => (y, x._2))
})
.reduceByKey((x, y) => {
require(x.size == y.size)
(x,y).zipped.map(_ + _)
})
.map(x => {
(x._1, x._2.sorted.take(x._2.size/20).last)
})
==== Query Plan
(16) MappedRDD[12] at map at GroupByTest.scala:81 []
| ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 []
+-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 []
| ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 []
==== Output
GroupBy VaR
(A-2,B-1) 54.75
(A-2,B-1,C-1,D-2) 54.75
(A-1,B-1) 149.25
(A-1,B-1,C-1,D-1,E-1) 149.25
(A-3,B-3,C-1) 49.75
(A-3,B-3) 49.75
(A-4,B-3,C-1,D-1,E-1) 57.75
(A-2,B-1,C-1) 54.75
(A-1,B-2,C-1,D-1,E-1) 49.75
(A-1,B-1,C-1,D-1) 149.25
(A-3,B-3,C-1,D-1,E-2) 49.75
(A-1,B-2,C-1) 49.75
(A-3,B-3,C-1,D-1) 49.75
(A-4,B-3) 57.75
(A-1,B-1,C-1) 149.25
A-1_top 199.0
(A-4,B-3,C-1,D-1) 57.75
A-2_top 54.75
(A-1,B-2) 49.75
(A-4,B-3,C-1) 57.75
A-3_top 49.75
A-4_top 57.75
(A-2,B-1,C-1,D-2,E-1) 54.75
(A-1,B-2,C-1,D-1) 49.75