Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

  def coGroupFuntion(first: Iterator[(Int, Int)],
                     second: Iterator[(Int, Int)],
                     out: Collector[(Int, Int)]): Unit = {
    if (second.hasNext) {
      out.collect(second.next)
    } else {
      printf("Not in second set: %s\n", first.next)
      println("These two lines doesn't appear when " +
        "running cogroup on solution set")
    }
  }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableSysoutLogging()

    val d1 = env.fromElements(
      new Tuple2(1, 1),
      new Tuple2(2, 1) ,
      new Tuple2(3, 1)
    )

    d1.iterateDelta(d1, 1, Array{0}) {
      (solutionSet, workSet) => {
        val f = workSet.filter(_._1 != 1)
        println("Cogroup on solution set with delta iteration")
        val newSolutionSet = solutionSet.coGroup(f)
          .where(0)
          .equalTo(0)
          .apply(coGroupFuntion _)
        (newSolutionSet, newSolutionSet)
      }
    }.print()

    println("Normal cogroup")
    val d2 = d1.filter(_._1 != 1)
    d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
  }
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong

Reply via email to