Hi,

As the DataStream API's iterativeStream method has been deprecated for future 
flink releases, the documentation recommend’s using Flink ML's iteration as an 
alternative. I am trying to build my understanding of the new iterations API as 
it will be a requirement for our future projects.

As an exercise, I’m trying to implement a KeyedRichCoProcessFunction inside the 
iteration body that takes the feedback Stream and non-feedbackstream as inputs 
but get the following error. Do you know what could be causing it? For 
reference, I do not get any error when applying  .keyBy().flatMap() function on 
the streams individually inside the iteration body.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
                at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
                ….
                at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
                at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
…
                at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
                ... 5 more
Caused by: java.lang.ClassCastException: class 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple 
(org.apache.flink.iteration.IterationRecord and 
org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
                at 
org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
                at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
                at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
                at 
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
                at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
                at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
                at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
                at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
                at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
                at java.base/java.lang.Thread.run(Thread.java:829)


I am attaching the full test code below for reference. All it does is subtracts 
1 from the feeback stream until  the tuples reaches 0.0. For each subtraction 
it outputs a relevant message in the finaloutput stream. These messages are 
stored in the keyedState of KeyedCoProcessFunction and are preloaded in the 
parallel instances by a dataset stream called initialStates. For each key there 
are different messages associated with it, hence the need for MapState.



import java.util.*;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.iteration.DataStreamList;
import org.apache.flink.iteration.IterationBodyResult;
import org.apache.flink.iteration.Iterations;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


public class Test {
    public static void main(String[] args) throws Exception {
        // Sets up the execution environment, which is the main entry point
        StreamExecutionEnvironment env =  
StreamExecutionEnvironment.createLocalEnvironment();

        // sample datastreams (they are assumed to be unbounded streams outside 
of this test environment)
        List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList(
                new Tuple2<>("A", 2.0),
                new Tuple2<>("B", 3.0),
                new Tuple2<>("C", 1.0),
                new Tuple2<>("D", 1.0)
        );

        List<Tuple3<String, Double, String>> initialStates = Arrays.asList(
                new Tuple3<>("A", 0.0, "Final Output A"),
                new Tuple3<>("A", 1.0, "Test 1A"),
                new Tuple3<>("B", 2.0, "Test 2B"),
                new Tuple3<>("B", 1.0, "Test 1B"),
                new Tuple3<>("B", 0.0, "Final Output B"),
                new Tuple3<>("C", 0.0, "No Change C"),
                new Tuple3<>("D", 0.0, "Final Output D")
        );


        DataStream<Tuple2<String, Double>> feedbackStream = 
env.fromCollection(feedbackinitializer);
        DataStream<Tuple3<String, Double, String>> initialStateStream = 
env.fromCollection(initialStates);

        //parallelize
        DataStream<Tuple2<String, Double>> feedbackParallel = 
feedbackStream.keyBy(x -> x.f0)
                .map(i -> Tuple2.of(i.f0,i.f1))
                .returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
        DataStream<Tuple3<String, Double, String>> initialStateParallel = 
initialStateStream.keyBy(x -> x.f0)
                .map(i -> Tuple3.of(i.f0,i.f1,i.f2))
                .returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.STRING));



        //iterate
        DataStreamList result = Iterations.iterateUnboundedStreams(
                DataStreamList.of(feedbackParallel),
                DataStreamList.of(initialStateParallel),
                (variableStreams, dataStreams) -> {
                    DataStream<Tuple2<String, Double>> modelUpdate = 
variableStreams.get(0);
                    DataStream<Tuple3<String, Double, String>> stateStream = 
dataStreams.get(0);


                    OutputTag<String> finalOutputTag = new 
OutputTag<String>("msgs") {
                    };

                    SingleOutputStreamOperator<Tuple2<String, Double>> 
newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new 
KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String, 
Double>, Tuple2<String, Double>>() {
                        private transient MapState<Double, String> state;

                        @Override
                        public void processElement1(Tuple3<String, Double, 
String> stateUpdate, Context context, Collector<Tuple2<String, Double>> 
collector) throws Exception {
                            state.put(stateUpdate.f1, stateUpdate.f2);  //load 
stateStream into mapState
                        }

                        @Override
                        public void processElement2(Tuple2<String, Double> 
modelUpdate, Context context, Collector<Tuple2<String, Double>> collector) 
throws Exception {
                            double weight = modelUpdate.f1;
                            weight = weight - 1;
                            //subtract 1 until 0.0
                            if (weight > -1.0) {
                                collector.collect(Tuple2.of(modelUpdate.f0, 
weight));
                                context.output(finalOutputTag, 
state.get(weight));
                            }
                        }
                    });

                    DataStream<String> finalOutput = 
newModelUpdate.getSideOutput(finalOutputTag);

                    return new IterationBodyResult(
                            DataStreamList.of(newModelUpdate),
                            DataStreamList.of(finalOutput));
                });

        result.get(0).print();

        // Execute program
        env.execute("Flink Java API Skeleton");
    }
}


Any help will be greatly appreciated.

Thanks,
Komal



Reply via email to