Xuannan Su created FLINK-28126: ---------------------------------- Summary: Iteration gets stuck when replayable datastream and its downstream operator have different parallelism Key: FLINK-28126 URL: https://issues.apache.org/jira/browse/FLINK-28126 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.0.0 Reporter: Xuannan Su
Iteration gets stuck when replayable datastream and its downstream operator have different parallelism. It can be reproduced with the following code snippet. {code:java} @Test public void testIteration() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final SingleOutputStreamOperator<Integer> variable = env.fromElements(0).name("i"); final SingleOutputStreamOperator<Integer> data = env.fromElements(1, 2).name("inc") .map(x -> x).setParallelism(1); // test can pass if parallelism is 2. final IterationConfig config = IterationConfig.newBuilder().build(); Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(variable), ReplayableDataStreamList.replay(data), config, (IterationBody) (variableStreams, dataStreams) -> { final DataStream<Integer> sample = dataStreams.get(0); final SingleOutputStreamOperator<Integer> trainOutput = sample .transform( "iter", TypeInformation.of(Integer.class), new IterTransform()) .setParallelism(2) .map((MapFunction<Integer, Integer>) integer -> integer) .setParallelism(1); return new IterationBodyResult( DataStreamList.of(trainOutput), DataStreamList.of(trainOutput)); }); env.execute(); } public static class IterTransform extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer>, IterationListener<Integer> { @Override public void processElement(StreamRecord<Integer> element) throws Exception { LOG.info("Processing element: {}", element); } @Override public void onEpochWatermarkIncremented( int epochWatermark, Context context, Collector<Integer> collector) throws Exception { LOG.info("onEpochWatermarkIncremented: {}", epochWatermark); if (epochWatermark >= 10) { return; } collector.collect(0); } @Override public void onIterationTerminated(Context context, Collector<Integer> collector) throws Exception { LOG.info("onIterationTerminated"); } } {code} After digging into the code, I found that the `ReplayOperator` doesn't emit the epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see if this is the case? -- This message was sent by Atlassian Jira (v8.20.7#820007)