Hi Flink Dev Team, I have two possible bugs to report for Flink ML Iteration. Flink v1.17.2 Flink ML v2.3.0 Java 11
Bug # 1 Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside IterationBody yields a “java.lang.ClassCastException: org.apache.flink.iteration.IterationRecord cannot be cast to class org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get any error when applying .keyBy().flatMap()on the streams individually inside the iteration body. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) …. at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) … at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 5 more Caused by: java.lang.ClassCastException: class org.apache.flink.iteration.IterationRecord cannot be cast to class org.apache.flink.api.java.tuple.Tuple (org.apache.flink.iteration.IterationRecord and org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app') at org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148) at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195) at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203) at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Potential Bug # 2 The onEpochWatermarkIncremented method is never invoked when the IterationListener<T> interface is implemented by a UDF inside the iterationBody. // method is invoked from within IterationBody public class ComputeML2 extends KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, String>> implements IterationListener<Tuple2<Integer, String>> { // this method is never invoked, getting no output @Override public void onEpochWatermarkIncremented(int epochWaterMark, IterationListener.Context context, Collector<Tuple2<Integer, String>> collector) throws Exception { collector.collect(Tuple2.of(epochWaterMark,"epoch")); //Bug: no output } @Override public void onIterationTerminated(IterationListener.Context context, Collector<Tuple2<Integer, String>> collector) throws Exception { } @Override public void processElement(Tuple2<Integer, String> integerStringTuple2, KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, String>>.Context context, Collector<Tuple2<Integer, String>> collector) throws Exception { // some processing here } } Let me know if I should submit these issues on JIRA. Thank you so much Komal