Zhipeng Zhang created FLINK-31255:
-------------------------------------
Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap
operator config
Key: FLINK-31255
URL: https://issues.apache.org/jira/browse/FLINK-31255
Project: Flink
Issue Type: Bug
Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang
Currently we use operator wrapper to enable using normal operators in
iterations. However, teh operatorConfig is not correctly unwrapped. For
example, the following code fails because of wrong type serializer.
{code:java}
@Test
public void testIterationWithMapPartition() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input =
env.fromParallelCollection(new NumberSequenceIterator(0L, 5L),
Types.LONG);
DataStreamList result =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input),
ReplayableDataStreamList.notReplay(input),
IterationConfig.newBuilder()
.setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
.build(),
new IterationBodyWithMapPartition());
List<Integer> counts =
IteratorUtils.toList(result.get(0).executeAndCollect());
System.out.println(counts.size());
}
private static class IterationBodyWithMapPartition implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream<Long> input = variableStreams.get(0);
DataStream<Long> mapPartitionResult =
DataStreamUtils.mapPartition(
input,
new MapPartitionFunction <Long, Long>() {
@Override
public void mapPartition(Iterable <Long> iterable,
Collector <Long> collector)
throws Exception {
for (Long iter: iterable) {
collector.collect(iter);
}
}
});
DataStream<Integer> terminationCriteria =
mapPartitionResult.<Long>flatMap(new
TerminateOnMaxIter(2)).returns(Types.INT);
return new IterationBodyResult(
DataStreamList.of(mapPartitionResult), variableStreams,
terminationCriteria);
}
} {code}
The error stack is:
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.flink.iteration.IterationRecord
at
org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
at
org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
at
org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
at
org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
at
org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
at
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)