Hi Flink Dev Team,
I have two possible bugs to report for Flink ML Iteration.
Flink v1.17.2
Flink ML v2.3.0
Java 11

Bug # 1
Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
IterationBody yields a “java.lang.ClassCastException: 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get any 
error when applying  .keyBy().flatMap()on the streams individually inside the 
iteration body.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
                at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
                ….
                at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
                at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
…
                at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
                ... 5 more
Caused by: java.lang.ClassCastException: class 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple 
(org.apache.flink.iteration.IterationRecord and 
org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
                at 
org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
                at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
                at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
                at 
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
                at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
                at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
                at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
                at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
                at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
                at java.base/java.lang.Thread.run(Thread.java:829)



Potential Bug # 2

The onEpochWatermarkIncremented method is never invoked when the 
IterationListener<T> interface is implemented by a UDF inside the iterationBody.



// method is invoked from within IterationBody

public class ComputeML2 extends KeyedProcessFunction<String, Tuple2<Integer, 
String>, Tuple2<Integer, String>> implements IterationListener<Tuple2<Integer, 
String>>  {

// this method is never invoked, getting no output

    @Override

    public void onEpochWatermarkIncremented(int epochWaterMark, 
IterationListener.Context context, Collector<Tuple2<Integer, String>> 
collector) throws Exception {

        collector.collect(Tuple2.of(epochWaterMark,"epoch"));  //Bug: no output

    }




    @Override

    public void onIterationTerminated(IterationListener.Context context, 
Collector<Tuple2<Integer, String>> collector) throws Exception {

    }


    @Override

    public void processElement(Tuple2<Integer, String> integerStringTuple2, 
KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, 
String>>.Context context, Collector<Tuple2<Integer, String>> collector) throws 
Exception {

            // some processing here

    }


}




Let me know if I should submit these issues on JIRA.
Thank you so much
Komal

Reply via email to