Hello All

I am observing some strange results with aggregateByKey API which is
implemented with combineByKey. Not sure if this is by design or bug -

I created this toy example but same problem can be observed on large
datasets as well -

*case class ABC(key: Int, c1: Int, c2: Int)*
*case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*

// Create RDD and making sure if has 1 or 2 partitions for this example.
// With 2 partitions there are high chances that same key could be in same
partition.
*val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
40), ABC(2, 20, 40))).coalece(2)*

Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
c2 but return ABCoutput with new 'desc' property.

*val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

Above query may return results like this -
[image: Inline image 1]

It means for one of the keys which has all values in same partition didn't
invoke mergeCombiner function which returns ABCoutput with desc=final.

I am expecting mergeCombiner function to be invoked all the time which is
not happening. Correct me if wrong, but is this expected behavior?

Further debugging shows that it works fine if I create input RDD with more
partitions( which increases chances of having rows with same key in
different partitions)

*val b = a.repartition(20).keyBy(x =>
x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
=> ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
m1.c2Sum+m2.c2Sum))*
[image: Inline image 2]

One more thing to mention - If I make sure my input RDD is partitioned then
it simply runs aggregation with mapPartitions (here
<https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>).
Now, this makes sense in terms of aggregations as all values for given key
are in same partition. However, I have something in my mergeCombiner
function that I would like to run which wont get invoked.
Traditional map reduce allows to have different combiner and reduce
function and it is guaranteed that reduce is always invoked. I can see that
running aggregations with no shuffle has performance gains but API seems to
be confusing/misleading. User might hope that mergeCombiner gets invoked
but in reality it isn't. It will be great if this API designers can shed
some light on this.

*import org.apache.spark.HashPartitioner*
*val b = a.keyBy(x => x.key).partitionBy(new
HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
"final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

[image: Inline image 3]

Above examples shows this behavior with AggregateByKey but same thing can
be observed with CombineByKey as well.
*val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
"initial", x.c1, x.c2), *
*  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum +
x2.c1, x1.c2Sum+x2.c2),*
*  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
x2.c1Sum, x1.c2Sum+x2.c2Sum))*

*[image: Inline image 4]*


Please let me know if you need any further information and correct me if my
understanding of API is wrong.

Thanks
Swapnil

Reply via email to