Hi, As the DataStream API's iterativeStream method has been deprecated for future flink releases, the documentation recommend’s using Flink ML's iteration as an alternative. I am trying to build my understanding of the new iterations API as it will be a requirement for our future projects.
As an exercise, I’m trying to implement a KeyedRichCoProcessFunction inside the iteration body that takes the feedback Stream and non-feedbackstream as inputs but get the following error. Do you know what could be causing it? For reference, I do not get any error when applying .keyBy().flatMap() function on the streams individually inside the iteration body. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) …. at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) … at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 5 more Caused by: java.lang.ClassCastException: class org.apache.flink.iteration.IterationRecord cannot be cast to class org.apache.flink.api.java.tuple.Tuple (org.apache.flink.iteration.IterationRecord and org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app') at org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148) at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195) at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203) at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) I am attaching the full test code below for reference. All it does is subtracts 1 from the feeback stream until the tuples reaches 0.0. For each subtraction it outputs a relevant message in the finaloutput stream. These messages are stored in the keyedState of KeyedCoProcessFunction and are preloaded in the parallel instances by a dataset stream called initialStates. For each key there are different messages associated with it, hence the need for MapState. import java.util.*; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.iteration.DataStreamList; import org.apache.flink.iteration.IterationBodyResult; import org.apache.flink.iteration.Iterations; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; public class Test { public static void main(String[] args) throws Exception { // Sets up the execution environment, which is the main entry point StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // sample datastreams (they are assumed to be unbounded streams outside of this test environment) List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList( new Tuple2<>("A", 2.0), new Tuple2<>("B", 3.0), new Tuple2<>("C", 1.0), new Tuple2<>("D", 1.0) ); List<Tuple3<String, Double, String>> initialStates = Arrays.asList( new Tuple3<>("A", 0.0, "Final Output A"), new Tuple3<>("A", 1.0, "Test 1A"), new Tuple3<>("B", 2.0, "Test 2B"), new Tuple3<>("B", 1.0, "Test 1B"), new Tuple3<>("B", 0.0, "Final Output B"), new Tuple3<>("C", 0.0, "No Change C"), new Tuple3<>("D", 0.0, "Final Output D") ); DataStream<Tuple2<String, Double>> feedbackStream = env.fromCollection(feedbackinitializer); DataStream<Tuple3<String, Double, String>> initialStateStream = env.fromCollection(initialStates); //parallelize DataStream<Tuple2<String, Double>> feedbackParallel = feedbackStream.keyBy(x -> x.f0) .map(i -> Tuple2.of(i.f0,i.f1)) .returns(Types.TUPLE(Types.STRING, Types.DOUBLE)); DataStream<Tuple3<String, Double, String>> initialStateParallel = initialStateStream.keyBy(x -> x.f0) .map(i -> Tuple3.of(i.f0,i.f1,i.f2)) .returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.STRING)); //iterate DataStreamList result = Iterations.iterateUnboundedStreams( DataStreamList.of(feedbackParallel), DataStreamList.of(initialStateParallel), (variableStreams, dataStreams) -> { DataStream<Tuple2<String, Double>> modelUpdate = variableStreams.get(0); DataStream<Tuple3<String, Double, String>> stateStream = dataStreams.get(0); OutputTag<String> finalOutputTag = new OutputTag<String>("msgs") { }; SingleOutputStreamOperator<Tuple2<String, Double>> newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String, Double>, Tuple2<String, Double>>() { private transient MapState<Double, String> state; @Override public void processElement1(Tuple3<String, Double, String> stateUpdate, Context context, Collector<Tuple2<String, Double>> collector) throws Exception { state.put(stateUpdate.f1, stateUpdate.f2); //load stateStream into mapState } @Override public void processElement2(Tuple2<String, Double> modelUpdate, Context context, Collector<Tuple2<String, Double>> collector) throws Exception { double weight = modelUpdate.f1; weight = weight - 1; //subtract 1 until 0.0 if (weight > -1.0) { collector.collect(Tuple2.of(modelUpdate.f0, weight)); context.output(finalOutputTag, state.get(weight)); } } }); DataStream<String> finalOutput = newModelUpdate.getSideOutput(finalOutputTag); return new IterationBodyResult( DataStreamList.of(newModelUpdate), DataStreamList.of(finalOutput)); }); result.get(0).print(); // Execute program env.execute("Flink Java API Skeleton"); } } Any help will be greatly appreciated. Thanks, Komal