Hi Flink Dev Team,
I have two possible bugs to report for Flink ML Iteration.
Flink v1.17.2
Flink ML v2.3.0
Java 11
Bug # 1
Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside
IterationBody yields a “java.lang.ClassCastException:
org.apache.flink.iteration.IterationRecord cannot be cast to class
org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get any
error when applying .keyBy().flatMap()on the streams individually inside the
iteration body.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
….
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
…
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.lang.ClassCastException: class
org.apache.flink.iteration.IterationRecord cannot be cast to class
org.apache.flink.api.java.tuple.Tuple
(org.apache.flink.iteration.IterationRecord and
org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
at
org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
at
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
at
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
at
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
at
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Potential Bug # 2
The onEpochWatermarkIncremented method is never invoked when the
IterationListener<T> interface is implemented by a UDF inside the iterationBody.
// method is invoked from within IterationBody
public class ComputeML2 extends KeyedProcessFunction<String, Tuple2<Integer,
String>, Tuple2<Integer, String>> implements IterationListener<Tuple2<Integer,
String>> {
// this method is never invoked, getting no output
@Override
public void onEpochWatermarkIncremented(int epochWaterMark,
IterationListener.Context context, Collector<Tuple2<Integer, String>>
collector) throws Exception {
collector.collect(Tuple2.of(epochWaterMark,"epoch")); //Bug: no output
}
@Override
public void onIterationTerminated(IterationListener.Context context,
Collector<Tuple2<Integer, String>> collector) throws Exception {
}
@Override
public void processElement(Tuple2<Integer, String> integerStringTuple2,
KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer,
String>>.Context context, Collector<Tuple2<Integer, String>> collector) throws
Exception {
// some processing here
}
}
Let me know if I should submit these issues on JIRA.
Thank you so much
Komal