I am observing some strange results with aggregateByKey API which is
implemented with combineByKey. Not sure if this is by design or bug -

I created this toy example but same problem can be observed on large
datasets as well -

*case class ABC(key: Int, c1: Int, c2: Int)*
*case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*

// Create RDD and making sure if has 1 or 2 partitions for this example.
// With 2 partitions there are high chances that same key could be in same
*val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
40), ABC(2, 20, 40))).coalece(2)*

Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
c2 but return ABCoutput with new 'desc' property.

*val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

Above query may return results like this -
[image: Inline image 1]

It means for one of the keys which has all values in same partition didn't
invoke mergeCombiner function which returns ABCoutput with desc=final.

I am expecting mergeCombiner function to be invoked all the time which is
not happening. Correct me if wrong, but is this expected behavior?

Further debugging shows that it works fine if I create input RDD with more
partitions( which increases chances of having rows with same key in
different partitions)

*val b = a.repartition(20).keyBy(x =>
x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
=> ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
[image: Inline image 2]

One more thing to mention - If I make sure my input RDD is partitioned then
it simply runs aggregation with mapPartitions (here
Now, this makes sense in terms of aggregations as all values for given key
are in same partition. However, I have something in my mergeCombiner
function that I would like to run which wont get invoked.
Traditional map reduce allows to have different combiner and reduce
function and it is guaranteed that reduce is always invoked. I can see that
running aggregations with no shuffle has performance gains but API seems to
be confusing/misleading. User might hope that mergeCombiner gets invoked
but in reality it isn't. It will be great if this API designers can shed
some light on this.

*import org.apache.spark.HashPartitioner*
*val b = a.keyBy(x => x.key).partitionBy(new
HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
"final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

[image: Inline image 3]

Above examples shows this behavior with AggregateByKey but same thing can
be observed with CombineByKey as well.
*val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
"initial", x.c1, x.c2), *
*  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum +
x2.c1, x1.c2Sum+x2.c2),*
*  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
x2.c1Sum, x1.c2Sum+x2.c2Sum))*

*[image: Inline image 4]*

Please let me know if you need any further information and correct me if my
understanding of API is wrong.


