Xuannan Su created FLINK-28126:
----------------------------------

             Summary: Iteration gets stuck when replayable datastream and its 
downstream operator have different parallelism
                 Key: FLINK-28126
                 URL: https://issues.apache.org/jira/browse/FLINK-28126
             Project: Flink
          Issue Type: Bug
          Components: Library / Machine Learning
    Affects Versions: ml-2.0.0
            Reporter: Xuannan Su


Iteration gets stuck when replayable datastream and its downstream operator 
have different parallelism. It can be reproduced with the following code 
snippet.


{code:java}
    @Test
    public void testIteration() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        final SingleOutputStreamOperator<Integer> variable = 
env.fromElements(0).name("i");
        final SingleOutputStreamOperator<Integer> data = env.fromElements(1, 
2).name("inc")
                .map(x -> x).setParallelism(1); // test can pass if parallelism 
is 2.

        final IterationConfig config = IterationConfig.newBuilder().build();

        Iterations.iterateBoundedStreamsUntilTermination(
                DataStreamList.of(variable),
                ReplayableDataStreamList.replay(data),
                config,
                (IterationBody) (variableStreams, dataStreams) -> {
                    final DataStream<Integer> sample = dataStreams.get(0);
                    final SingleOutputStreamOperator<Integer> trainOutput =
                            sample
                                    .transform(
                                            "iter",
                                            TypeInformation.of(Integer.class),
                                            new IterTransform())
                                    .setParallelism(2)
                                    .map((MapFunction<Integer, Integer>) 
integer -> integer)
                                    .setParallelism(1);

                    return new IterationBodyResult(
                            DataStreamList.of(trainOutput), 
DataStreamList.of(trainOutput));
                });

        env.execute();
    }

    public static class IterTransform extends AbstractStreamOperator<Integer>
            implements OneInputStreamOperator<Integer, Integer>, 
IterationListener<Integer> {

        @Override
        public void processElement(StreamRecord<Integer> element) throws 
Exception {
            LOG.info("Processing element: {}", element);
        }

        @Override
        public void onEpochWatermarkIncremented(
                int epochWatermark, Context context, Collector<Integer> 
collector)
                throws Exception {
            LOG.info("onEpochWatermarkIncremented: {}", epochWatermark);
            if (epochWatermark >= 10) {
                return;
            }
            collector.collect(0);
        }

        @Override
        public void onIterationTerminated(Context context, Collector<Integer> 
collector)
                throws Exception {
            LOG.info("onIterationTerminated");
        }
    }
{code}

After digging into the code, I found that the `ReplayOperator` doesn't emit the 
epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see 
if this is the case?





--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to