Ping.. Can someone please correct me whether this is an issue or not. - Swapnil
On Thu, Aug 31, 2017 at 12:27 PM, Swapnil Shinde <swapnilushi...@gmail.com> wrote: > Hello All > > I am observing some strange results with aggregateByKey API which is > implemented with combineByKey. Not sure if this is by design or bug - > > I created this toy example but same problem can be observed on large > datasets as well - > > *case class ABC(key: Int, c1: Int, c2: Int)* > *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)* > > // Create RDD and making sure if has 1 or 2 partitions for this example. > // With 2 partitions there are high chances that same key could be in same > partition. > *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20, > 40), ABC(2, 20, 40))).coalece(2)* > > Now, I am running aggregateByKey where I am grouping by Key to sum c1 and > c2 but return ABCoutput with new 'desc' property. > > *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) > ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", > x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => > ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* > > Above query may return results like this - > [image: Inline image 1] > > It means for one of the keys which has all values in same partition didn't > invoke mergeCombiner function which returns ABCoutput with desc=final. > > I am expecting mergeCombiner function to be invoked all the time which is > not happening. Correct me if wrong, but is this expected behavior? > > Further debugging shows that it works fine if I create input RDD with more > partitions( which increases chances of having rows with same key in > different partitions) > > *val b = a.repartition(20).keyBy(x => > x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) > => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: > ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, > m1.c2Sum+m2.c2Sum))* > [image: Inline image 2] > > One more thing to mention - If I make sure my input RDD is partitioned > then it simply runs aggregation with mapPartitions (here > <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>). > Now, this makes sense in terms of aggregations as all values for given key > are in same partition. However, I have something in my mergeCombiner > function that I would like to run which wont get invoked. > Traditional map reduce allows to have different combiner and reduce > function and it is guaranteed that reduce is always invoked. I can see that > running aggregations with no shuffle has performance gains but API seems to > be confusing/misleading. User might hope that mergeCombiner gets invoked > but in reality it isn't. It will be great if this API designers can shed > some light on this. > > *import org.apache.spark.HashPartitioner* > *val b = a.keyBy(x => x.key).partitionBy(new > HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: > ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, > x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, > "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* > > [image: Inline image 3] > > Above examples shows this behavior with AggregateByKey but same thing can > be observed with CombineByKey as well. > *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key, > "initial", x.c1, x.c2), * > * (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum > + x2.c1, x1.c2Sum+x2.c2),* > * (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum + > x2.c1Sum, x1.c2Sum+x2.c2Sum))* > > *[image: Inline image 4]* > > > Please let me know if you need any further information and correct me if > my understanding of API is wrong. > > Thanks > Swapnil >