Hi,
As the DataStream API's iterativeStream method has been deprecated for future
flink releases, the documentation recommend’s using Flink ML's iteration as an
alternative. I am trying to build my understanding of the new iterations API as
it will be a requirement for our future projects.
As an exercise, I’m trying to implement a KeyedRichCoProcessFunction inside the
iteration body that takes the feedback Stream and non-feedbackstream as inputs
but get the following error. Do you know what could be causing it? For
reference, I do not get any error when applying .keyBy().flatMap() function on
the streams individually inside the iteration body.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
….
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
…
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.lang.ClassCastException: class
org.apache.flink.iteration.IterationRecord cannot be cast to class
org.apache.flink.api.java.tuple.Tuple
(org.apache.flink.iteration.IterationRecord and
org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
at
org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
at
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
at
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
at
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
at
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
I am attaching the full test code below for reference. All it does is subtracts
1 from the feeback stream until the tuples reaches 0.0. For each subtraction
it outputs a relevant message in the finaloutput stream. These messages are
stored in the keyedState of KeyedCoProcessFunction and are preloaded in the
parallel instances by a dataset stream called initialStates. For each key there
are different messages associated with it, hence the need for MapState.
import java.util.*;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.iteration.DataStreamList;
import org.apache.flink.iteration.IterationBodyResult;
import org.apache.flink.iteration.Iterations;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Test {
public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
// sample datastreams (they are assumed to be unbounded streams outside
of this test environment)
List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList(
new Tuple2<>("A", 2.0),
new Tuple2<>("B", 3.0),
new Tuple2<>("C", 1.0),
new Tuple2<>("D", 1.0)
);
List<Tuple3<String, Double, String>> initialStates = Arrays.asList(
new Tuple3<>("A", 0.0, "Final Output A"),
new Tuple3<>("A", 1.0, "Test 1A"),
new Tuple3<>("B", 2.0, "Test 2B"),
new Tuple3<>("B", 1.0, "Test 1B"),
new Tuple3<>("B", 0.0, "Final Output B"),
new Tuple3<>("C", 0.0, "No Change C"),
new Tuple3<>("D", 0.0, "Final Output D")
);
DataStream<Tuple2<String, Double>> feedbackStream =
env.fromCollection(feedbackinitializer);
DataStream<Tuple3<String, Double, String>> initialStateStream =
env.fromCollection(initialStates);
//parallelize
DataStream<Tuple2<String, Double>> feedbackParallel =
feedbackStream.keyBy(x -> x.f0)
.map(i -> Tuple2.of(i.f0,i.f1))
.returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
DataStream<Tuple3<String, Double, String>> initialStateParallel =
initialStateStream.keyBy(x -> x.f0)
.map(i -> Tuple3.of(i.f0,i.f1,i.f2))
.returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.STRING));
//iterate
DataStreamList result = Iterations.iterateUnboundedStreams(
DataStreamList.of(feedbackParallel),
DataStreamList.of(initialStateParallel),
(variableStreams, dataStreams) -> {
DataStream<Tuple2<String, Double>> modelUpdate =
variableStreams.get(0);
DataStream<Tuple3<String, Double, String>> stateStream =
dataStreams.get(0);
OutputTag<String> finalOutputTag = new
OutputTag<String>("msgs") {
};
SingleOutputStreamOperator<Tuple2<String, Double>>
newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new
KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String,
Double>, Tuple2<String, Double>>() {
private transient MapState<Double, String> state;
@Override
public void processElement1(Tuple3<String, Double,
String> stateUpdate, Context context, Collector<Tuple2<String, Double>>
collector) throws Exception {
state.put(stateUpdate.f1, stateUpdate.f2); //load
stateStream into mapState
}
@Override
public void processElement2(Tuple2<String, Double>
modelUpdate, Context context, Collector<Tuple2<String, Double>> collector)
throws Exception {
double weight = modelUpdate.f1;
weight = weight - 1;
//subtract 1 until 0.0
if (weight > -1.0) {
collector.collect(Tuple2.of(modelUpdate.f0,
weight));
context.output(finalOutputTag,
state.get(weight));
}
}
});
DataStream<String> finalOutput =
newModelUpdate.getSideOutput(finalOutputTag);
return new IterationBodyResult(
DataStreamList.of(newModelUpdate),
DataStreamList.of(finalOutput));
});
result.get(0).print();
// Execute program
env.execute("Flink Java API Skeleton");
}
}
Any help will be greatly appreciated.
Thanks,
Komal