Hi, When running CoGroup between the solution set and a different dataset inside a DeltaIteration, the CoGroupFunction only get called for items that exist in the other dataset, simillar to an inner join. This is not the documented behavior for CoGroup:
If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction is called with an empty group for the non-existing group.
The following code shows the problem. import org.apache.flink.api.scala._ import org.apache.flink.util.Collector object CoGroupExample { def coGroupFuntion(first: Iterator[(Int, Int)], second: Iterator[(Int, Int)], out: Collector[(Int, Int)]): Unit = { if (second.hasNext) { out.collect(second.next) } else { printf("Not in second set: %s\n", first.next) println("These two lines doesn't appear when " + "running cogroup on solution set") } } def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.getConfig.disableSysoutLogging() val d1 = env.fromElements( new Tuple2(1, 1), new Tuple2(2, 1) , new Tuple2(3, 1) ) d1.iterateDelta(d1, 1, Array{0}) { (solutionSet, workSet) => { val f = workSet.filter(_._1 != 1) println("Cogroup on solution set with delta iteration") val newSolutionSet = solutionSet.coGroup(f) .where(0) .equalTo(0) .apply(coGroupFuntion _) (newSolutionSet, newSolutionSet) } }.print() println("Normal cogroup") val d2 = d1.filter(_._1 != 1) d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print() } } Is this the expected behavior or should I file a bug about this ? Best regards, Kien Truong