Hello Fabian, Thanks for taking time to provide your recommendation, This is how I have implemented:
case class Something(f1: Int,f2: Int,f3: Int,f4: String ) // My application's data structure *val k = envDefault .fromElements(Something(1,1,2,"A"),Something(1,1,2,"B"),Something(2,1,3,"A"),Something(3,1,4,"C")) .map(e => (e.f1, e.f2, e.f3, e.f4,1)) // I create a temporary tuple .groupBy(1,2) .sum(4) .map(e => (Something(e._1,e._2,e._3,e._4),e._5)) .print* The output is *(Something(2,1,3,A),1) (Something(1,1,2,B),2) (Something(3,1,4,C),1)* I need to create a temporary tuple, because I need group by fields of the case class; yet, I need to sum the fifth (newly added) field. Somehow, I feel this is clunky! Is this a preferred way? Is there a better (performant, yet idiomatic) way? Please make me wiser. -- Nirmalya -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-of-Grouped-DataSet-tp6592p6623.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.