Ping.. Can someone please correct me whether this is an issue or not.

-
Swapnil

On Thu, Aug 31, 2017 at 12:27 PM, Swapnil Shinde <swapnilushi...@gmail.com>
wrote:

> Hello All
>
> I am observing some strange results with aggregateByKey API which is
> implemented with combineByKey. Not sure if this is by design or bug -
>
> I created this toy example but same problem can be observed on large
> datasets as well -
>
> *case class ABC(key: Int, c1: Int, c2: Int)*
> *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*
>
> // Create RDD and making sure if has 1 or 2 partitions for this example.
> // With 2 partitions there are high chances that same key could be in same
> partition.
> *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
> 40), ABC(2, 20, 40))).coalece(2)*
>
> Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
> c2 but return ABCoutput with new 'desc' property.
>
> *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
> ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
> x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
> ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*
>
> Above query may return results like this -
> [image: Inline image 1]
>
> It means for one of the keys which has all values in same partition didn't
> invoke mergeCombiner function which returns ABCoutput with desc=final.
>
> I am expecting mergeCombiner function to be invoked all the time which is
> not happening. Correct me if wrong, but is this expected behavior?
>
> Further debugging shows that it works fine if I create input RDD with more
> partitions( which increases chances of having rows with same key in
> different partitions)
>
> *val b = a.repartition(20).keyBy(x =>
> x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
> => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
> ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
> m1.c2Sum+m2.c2Sum))*
> [image: Inline image 2]
>
> One more thing to mention - If I make sure my input RDD is partitioned
> then it simply runs aggregation with mapPartitions (here
> <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>).
> Now, this makes sense in terms of aggregations as all values for given key
> are in same partition. However, I have something in my mergeCombiner
> function that I would like to run which wont get invoked.
> Traditional map reduce allows to have different combiner and reduce
> function and it is guaranteed that reduce is always invoked. I can see that
> running aggregations with no shuffle has performance gains but API seems to
> be confusing/misleading. User might hope that mergeCombiner gets invoked
> but in reality it isn't. It will be great if this API designers can shed
> some light on this.
>
> *import org.apache.spark.HashPartitioner*
> *val b = a.keyBy(x => x.key).partitionBy(new
> HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
> ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
> x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
> "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*
>
> [image: Inline image 3]
>
> Above examples shows this behavior with AggregateByKey but same thing can
> be observed with CombineByKey as well.
> *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
> "initial", x.c1, x.c2), *
> *  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum
> + x2.c1, x1.c2Sum+x2.c2),*
> *  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
> x2.c1Sum, x1.c2Sum+x2.c2Sum))*
>
> *[image: Inline image 4]*
>
>
> Please let me know if you need any further information and correct me if
> my understanding of API is wrong.
>
> Thanks
> Swapnil
>

Reply via email to