This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 014c217  [FLINK-24650][iteration] Add unbounded iteration.
014c217 is described below

commit 014c217d66132ddf150684e0c8d73339c1434415
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Tue Sep 28 13:59:59 2021 +0800

    [FLINK-24650][iteration] Add unbounded iteration.
    
    This closes #12.
---
 .../apache/flink/iteration/IterationFactory.java   | 258 +++++++++++++++++++++
 .../org/apache/flink/iteration/Iterations.java     |   4 +-
 .../compile/DraftExecutionEnvironment.java         |  17 +-
 .../EpochAwareAllRoundProcessFunction.java         |   4 +-
 .../flink/iteration/IterationConstructionTest.java | 177 ++++++++++++++
 .../itcases/UnboundedStreamIterationITCase.java    | 255 ++++++++++++++++++++
 .../iteration/itcases/operators/CollectSink.java   |  39 ++++
 .../iteration/itcases/operators/EpochRecord.java   |  53 +++++
 .../itcases/operators/IncrementEpochMap.java       |  30 +++
 .../iteration/itcases/operators/OutputRecord.java  |  73 ++++++
 .../operators/ReduceAllRoundProcessFunction.java   | 119 ++++++++++
 .../itcases/operators/SequenceSource.java          |  60 +++++
 .../TwoInputReduceAllRoundProcessFunction.java     |  81 +++++++
 13 files changed, 1164 insertions(+), 6 deletions(-)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java
new file mode 100644
index 0000000..3235651
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.HeadOperatorFactory;
+import org.apache.flink.iteration.operator.InputOperator;
+import org.apache.flink.iteration.operator.OperatorWrapper;
+import org.apache.flink.iteration.operator.OutputOperator;
+import org.apache.flink.iteration.operator.TailOperator;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Creates iteration in a job. */
+@Internal
+public class IterationFactory {
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static DataStreamList createIteration(
+            DataStreamList initVariableStreams,
+            DataStreamList dataStreams,
+            IterationBody body,
+            OperatorWrapper<?, IterationRecord<?>> initialOperatorWrapper,
+            boolean mayHaveCriteria) {
+        checkState(initVariableStreams.size() > 0, "There should be at least 
one variable stream");
+
+        IterationID iterationId = new IterationID();
+
+        List<TypeInformation<?>> initVariableTypeInfos = 
getTypeInfos(initVariableStreams);
+        List<TypeInformation<?>> dataStreamTypeInfos = 
getTypeInfos(dataStreams);
+
+        // Add heads and inputs
+        int totalInitVariableParallelism =
+                map(
+                                initVariableStreams,
+                                dataStream ->
+                                        dataStream.getParallelism() > 0
+                                                ? dataStream.getParallelism()
+                                                : dataStream
+                                                        
.getExecutionEnvironment()
+                                                        .getConfig()
+                                                        .getParallelism())
+                        .stream()
+                        .mapToInt(i -> i)
+                        .sum();
+        DataStreamList initVariableInputs = addInputs(initVariableStreams, 
false);
+        DataStreamList headStreams =
+                addHeads(
+                        initVariableStreams,
+                        initVariableInputs,
+                        iterationId,
+                        totalInitVariableParallelism,
+                        false,
+                        0);
+
+        DataStreamList dataStreamInputs = addInputs(dataStreams, true);
+
+        // Create the iteration body. We map the inputs of iteration body into 
the draft sources,
+        // which serve as the start points to build the draft subgraph.
+        StreamExecutionEnvironment env = 
initVariableStreams.get(0).getExecutionEnvironment();
+        DraftExecutionEnvironment draftEnv =
+                new DraftExecutionEnvironment(env, initialOperatorWrapper);
+        DataStreamList draftHeadStreams =
+                addDraftSources(headStreams, draftEnv, initVariableTypeInfos);
+        DataStreamList draftDataStreamInputs =
+                addDraftSources(dataStreamInputs, draftEnv, 
dataStreamTypeInfos);
+
+        IterationBodyResult iterationBodyResult =
+                body.process(draftHeadStreams, draftDataStreamInputs);
+        
ensuresTransformationAdded(iterationBodyResult.getFeedbackVariableStreams(), 
draftEnv);
+        ensuresTransformationAdded(iterationBodyResult.getOutputStreams(), 
draftEnv);
+        draftEnv.copyToActualEnvironment();
+
+        // Add tails and co-locate them with the heads.
+        DataStreamList feedbackStreams =
+                
getActualDataStreams(iterationBodyResult.getFeedbackVariableStreams(), 
draftEnv);
+        checkState(
+                feedbackStreams.size() == initVariableStreams.size(),
+                "The number of feedback streams "
+                        + feedbackStreams.size()
+                        + " does not match the initialized one "
+                        + initVariableStreams.size());
+        DataStreamList tails = addTails(feedbackStreams, iterationId, 0);
+        for (int i = 0; i < headStreams.size(); ++i) {
+            String coLocationGroupKey = "co-" + iterationId.toHexString() + 
"-" + i;
+            
headStreams.get(i).getTransformation().setCoLocationGroupKey(coLocationGroupKey);
+            
tails.get(i).getTransformation().setCoLocationGroupKey(coLocationGroupKey);
+        }
+
+        checkState(
+                mayHaveCriteria || 
iterationBodyResult.getTerminationCriteria() == null,
+                "The current iteration type does not support the termination 
criteria.");
+
+        // TODO: will consider the termination criteria in the next.
+
+        return 
addOutputs(getActualDataStreams(iterationBodyResult.getOutputStreams(), 
draftEnv));
+    }
+
+    private static List<TypeInformation<?>> getTypeInfos(DataStreamList 
dataStreams) {
+        return map(dataStreams, DataStream::getType);
+    }
+
+    private static DataStreamList addInputs(
+            DataStreamList dataStreams, boolean insertMaxEpochWatermark) {
+        return new DataStreamList(
+                map(
+                        dataStreams,
+                        dataStream ->
+                                dataStream
+                                        .transform(
+                                                "input-" + 
dataStream.getTransformation().getName(),
+                                                new 
IterationRecordTypeInfo<>(dataStream.getType()),
+                                                new 
InputOperator(insertMaxEpochWatermark))
+                                        
.setParallelism(dataStream.getParallelism())));
+    }
+
+    private static DataStreamList addHeads(
+            DataStreamList variableStreams,
+            DataStreamList inputStreams,
+            IterationID iterationId,
+            int totalInitVariableParallelism,
+            boolean isCriteriaStream,
+            int startHeaderIndex) {
+
+        return new DataStreamList(
+                map(
+                        inputStreams,
+                        (index, dataStream) ->
+                                
((SingleOutputStreamOperator<IterationRecord<?>>) dataStream)
+                                        .transform(
+                                                "head-"
+                                                        + variableStreams
+                                                                .get(index)
+                                                                
.getTransformation()
+                                                                .getName(),
+                                                (IterationRecordTypeInfo) 
dataStream.getType(),
+                                                new HeadOperatorFactory(
+                                                        iterationId,
+                                                        startHeaderIndex + 
index,
+                                                        isCriteriaStream,
+                                                        
totalInitVariableParallelism))
+                                        
.setParallelism(dataStream.getParallelism())));
+    }
+
+    private static DataStreamList addTails(
+            DataStreamList dataStreams, IterationID iterationId, int 
startIndex) {
+        return new DataStreamList(
+                map(
+                        dataStreams,
+                        (index, dataStream) ->
+                                ((DataStream<IterationRecord<?>>) dataStream)
+                                        .transform(
+                                                "tail-" + 
dataStream.getTransformation().getName(),
+                                                new 
IterationRecordTypeInfo(dataStream.getType()),
+                                                new TailOperator(iterationId, 
startIndex + index))
+                                        
.setParallelism(dataStream.getParallelism())));
+    }
+
+    private static DataStreamList addOutputs(DataStreamList dataStreams) {
+        return new DataStreamList(
+                map(
+                        dataStreams,
+                        (index, dataStream) -> {
+                            IterationRecordTypeInfo<?> inputType =
+                                    (IterationRecordTypeInfo<?>) 
dataStream.getType();
+                            return dataStream
+                                    .transform(
+                                            "output-" + 
dataStream.getTransformation().getName(),
+                                            inputType.getInnerTypeInfo(),
+                                            new OutputOperator())
+                                    
.setParallelism(dataStream.getParallelism());
+                        }));
+    }
+
+    private static DataStreamList addDraftSources(
+            DataStreamList dataStreams,
+            DraftExecutionEnvironment draftEnv,
+            List<TypeInformation<?>> typeInfos) {
+
+        return new DataStreamList(
+                map(
+                        dataStreams,
+                        (index, dataStream) ->
+                                draftEnv.addDraftSource(dataStream, 
typeInfos.get(index))));
+    }
+
+    private static void ensuresTransformationAdded(
+            DataStreamList dataStreams, DraftExecutionEnvironment draftEnv) {
+        map(
+                dataStreams,
+                dataStream -> {
+                    
draftEnv.addOperatorIfNotExists(dataStream.getTransformation());
+                    return null;
+                });
+    }
+
+    private static void setCriteriaParallelism(
+            DataStreamList headStreams, int criteriaParallelism) {
+        map(
+                headStreams,
+                dataStream -> {
+                    ((HeadOperatorFactory)
+                                    ((OneInputTransformation) 
dataStream.getTransformation())
+                                            .getOperatorFactory())
+                            .setCriteriaStreamParallelism(criteriaParallelism);
+                    return null;
+                });
+    }
+
+    private static DataStreamList getActualDataStreams(
+            DataStreamList draftStreams, DraftExecutionEnvironment draftEnv) {
+        return new DataStreamList(
+                map(draftStreams, dataStream -> 
draftEnv.getActualStream(dataStream.getId())));
+    }
+
+    private static <R> List<R> map(DataStreamList dataStreams, 
Function<DataStream<?>, R> mapper) {
+        return map(dataStreams, (i, dataStream) -> mapper.apply(dataStream));
+    }
+
+    private static <R> List<R> map(
+            DataStreamList dataStreams, BiFunction<Integer, DataStream<?>, R> 
mapper) {
+        List<R> results = new ArrayList<>(dataStreams.size());
+        for (int i = 0; i < dataStreams.size(); ++i) {
+            DataStream<?> dataStream = dataStreams.get(i);
+            results.add(mapper.apply(i, dataStream));
+        }
+
+        return results;
+    }
+}
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 24789e6..556c001 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -19,6 +19,7 @@
 package org.apache.flink.iteration;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
 
 /**
  * A helper class to create iterations. To construct an iteration, Users are 
required to provide
@@ -86,7 +87,8 @@ public class Iterations {
      */
     public static DataStreamList iterateUnboundedStreams(
             DataStreamList initVariableStreams, DataStreamList dataStreams, 
IterationBody body) {
-        return null;
+        return IterationFactory.createIteration(
+                initVariableStreams, dataStreams, body, new 
AllRoundOperatorWrapper(), false);
     }
 
     /**
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java
index 6b7db68..e0b7d4e 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.iteration.compile;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -51,8 +50,10 @@ import 
org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -88,6 +89,8 @@ public class DraftExecutionEnvironment extends 
StreamExecutionEnvironment {
 
     private final StreamExecutionEnvironment actualEnv;
 
+    private final Set<Integer> explicitlyAddedTransformations;
+
     private final Map<Integer, OperatorWrapper<?, ?>> draftWrappers;
 
     private final Map<Integer, Transformation<?>> draftToActualTransformations;
@@ -103,6 +106,7 @@ public class DraftExecutionEnvironment extends 
StreamExecutionEnvironment {
                 ReflectionUtils.getFieldValue(
                         actualEnv, StreamExecutionEnvironment.class, 
"userClassloader"));
         this.actualEnv = actualEnv;
+        this.explicitlyAddedTransformations = new HashSet<>();
         this.draftWrappers = new HashMap<>();
         this.draftToActualTransformations = new HashMap<>();
 
@@ -126,6 +130,7 @@ public class DraftExecutionEnvironment extends 
StreamExecutionEnvironment {
         // Record the wrapper
         recordWrapper(transformation);
         super.addOperator(transformation);
+        explicitlyAddedTransformations.add(transformation.getId());
     }
 
     private void recordWrapper(Transformation<?> transformation) {
@@ -141,6 +146,12 @@ public class DraftExecutionEnvironment extends 
StreamExecutionEnvironment {
         }
     }
 
+    public void addOperatorIfNotExists(Transformation<?> transformation) {
+        if (!explicitlyAddedTransformations.contains(transformation.getId())) {
+            addOperator(transformation);
+        }
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     public <T> DataStream<T> addDraftSource(
             DataStream<?> actualStream, TypeInformation<T> draftOutputType) {
@@ -216,8 +227,8 @@ public class DraftExecutionEnvironment extends 
StreamExecutionEnvironment {
         }
     }
 
-    @VisibleForTesting
-    static class EmptySource<T> extends RichParallelSourceFunction<T> {
+    /** A special source that emits no data. */
+    public static class EmptySource<T> extends RichParallelSourceFunction<T> {
 
         @Override
         public void run(SourceContext<T> ctx) throws Exception {}
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java
index 163da32..35bfd85 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java
@@ -47,6 +47,6 @@ public abstract class EpochAwareAllRoundProcessFunction<I, O> 
extends ProcessFun
         processElement(input, epochSupplier.get(), context, collector);
     }
 
-    public abstract void processElement(
-            I input, int epoch, Context context, Collector<O> collector);
+    public abstract void processElement(I input, int epoch, Context context, 
Collector<O> collector)
+            throws Exception;
 }
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
new file mode 100644
index 0000000..d6873bf
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration;
+
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+/** Verifies the created job graph satisfy the expectation. */
+public class IterationConstructionTest extends TestLogger {
+
+    @Test
+    public void testEmptyIterationBody() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+        DataStream<Integer> variableSource =
+                env.addSource(new 
DraftExecutionEnvironment.EmptySource<Integer>() {})
+                        .name("Variable");
+        DataStreamList result =
+                Iterations.iterateUnboundedStreams(
+                        DataStreamList.of(variableSource),
+                        DataStreamList.of(),
+                        ((variableStreams, dataStreams) ->
+                                new IterationBodyResult(variableStreams, 
dataStreams)));
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+        List<String> expectedVertexNames =
+                Arrays.asList(
+                        /* 0 */ "Source: Variable -> input-Variable",
+                        /* 1 */ "head-Variable",
+                        /* 2 */ "tail-head-Variable");
+        List<Integer> expectedParallelisms = Arrays.asList(4, 4, 4);
+
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertEquals(
+                expectedVertexNames,
+                
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
+        assertEquals(
+                expectedParallelisms,
+                
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
+        assertNotNull(vertices.get(1).getCoLocationGroup());
+        assertNotNull(vertices.get(2).getCoLocationGroup());
+        assertSame(vertices.get(1).getCoLocationGroup(), 
vertices.get(2).getCoLocationGroup());
+    }
+
+    @Test
+    public void testUnboundedIteration() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Integer> variableSource1 =
+                env.addSource(new 
DraftExecutionEnvironment.EmptySource<Integer>() {})
+                        .setParallelism(2)
+                        .name("Variable0");
+        DataStream<Integer> variableSource2 =
+                env.addSource(new 
DraftExecutionEnvironment.EmptySource<Integer>() {})
+                        .setParallelism(2)
+                        .name("Variable1");
+
+        DataStream<Integer> constantSource =
+                env.addSource(new 
DraftExecutionEnvironment.EmptySource<Integer>() {})
+                        .setParallelism(3)
+                        .name("Constant");
+
+        DataStreamList result =
+                Iterations.iterateUnboundedStreams(
+                        DataStreamList.of(variableSource1, variableSource2),
+                        DataStreamList.of(constantSource),
+                        new IterationBody() {
+
+                            @Override
+                            public IterationBodyResult process(
+                                    DataStreamList variableStreams, 
DataStreamList dataStreams) {
+                                SingleOutputStreamOperator<Integer> processor =
+                                        variableStreams
+                                                .<Integer>get(0)
+                                                
.union(variableStreams.<Integer>get(1))
+                                                
.connect(dataStreams.<Integer>get(0))
+                                                .process(
+                                                        new CoProcessFunction<
+                                                                Integer, 
Integer, Integer>() {
+                                                            @Override
+                                                            public void 
processElement1(
+                                                                    Integer 
value,
+                                                                    Context 
ctx,
+                                                                    
Collector<Integer> out)
+                                                                    throws 
Exception {}
+
+                                                            @Override
+                                                            public void 
processElement2(
+                                                                    Integer 
value,
+                                                                    Context 
ctx,
+                                                                    
Collector<Integer> out)
+                                                                    throws 
Exception {}
+                                                        })
+                                                .name("Processor")
+                                                .setParallelism(4);
+
+                                return new IterationBodyResult(
+                                        DataStreamList.of(
+                                                processor
+                                                        .map(x -> x)
+                                                        .name("Feedback0")
+                                                        .setParallelism(2),
+                                                processor
+                                                        .map(x -> x)
+                                                        .name("Feedback1")
+                                                        .setParallelism(3)),
+                                        DataStreamList.of(
+                                                processor.getSideOutput(
+                                                        new 
OutputTag<Integer>("output") {})));
+                            }
+                        });
+        result.get(0).addSink(new 
DiscardingSink<>()).name("Sink").setParallelism(4);
+
+        List<String> expectedVertexNames =
+                Arrays.asList(
+                        /* 0 */ "Source: Variable0 -> input-Variable0",
+                        /* 1 */ "Source: Variable1 -> input-Variable1",
+                        /* 2 */ "Source: Constant -> input-Constant",
+                        /* 3 */ "head-Variable0",
+                        /* 4 */ "head-Variable1",
+                        /* 5 */ "Processor -> output-SideOutput -> Sink: Sink",
+                        /* 6 */ "Feedback0",
+                        /* 7 */ "tail-Feedback0",
+                        /* 8 */ "Feedback1",
+                        /* 9 */ "tail-Feedback1");
+        List<Integer> expectedParallelisms = Arrays.asList(2, 2, 3, 2, 2, 4, 
2, 2, 3, 3);
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertEquals(
+                expectedVertexNames,
+                
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
+        assertEquals(
+                expectedParallelisms,
+                
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
+
+        assertNotNull(vertices.get(3).getCoLocationGroup());
+        assertNotNull(vertices.get(4).getCoLocationGroup());
+        assertSame(vertices.get(3).getCoLocationGroup(), 
vertices.get(7).getCoLocationGroup());
+        assertSame(vertices.get(4).getCoLocationGroup(), 
vertices.get(9).getCoLocationGroup());
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/UnboundedStreamIterationITCase.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/UnboundedStreamIterationITCase.java
new file mode 100644
index 0000000..c133fe1
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/UnboundedStreamIterationITCase.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.itcases.operators.CollectSink;
+import org.apache.flink.iteration.itcases.operators.EpochRecord;
+import org.apache.flink.iteration.itcases.operators.IncrementEpochMap;
+import org.apache.flink.iteration.itcases.operators.OutputRecord;
+import 
org.apache.flink.iteration.itcases.operators.ReduceAllRoundProcessFunction;
+import org.apache.flink.iteration.itcases.operators.SequenceSource;
+import 
org.apache.flink.iteration.itcases.operators.TwoInputReduceAllRoundProcessFunction;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/** Integration cases for unbounded iteration. */
+public class UnboundedStreamIterationITCase extends TestLogger {
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    private MiniCluster miniCluster;
+
+    private SharedReference<BlockingQueue<OutputRecord<Integer>>> result;
+
+    @Before
+    public void setup() throws Exception {
+        miniCluster = new MiniCluster(createMiniClusterConfiguration(2, 2));
+        miniCluster.start();
+
+        result = sharedObjects.add(new LinkedBlockingQueue<>());
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (miniCluster != null) {
+            miniCluster.close();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testVariableOnlyUnboundedIteration() throws Exception {
+        // Create the test job
+        JobGraph jobGraph = createVariableOnlyJobGraph(4, 1000, true, 0, 
false, 1, result);
+        miniCluster.submitJob(jobGraph);
+
+        // Expected records is round * parallelism * numRecordsPerSource
+        Map<Integer, Tuple2<Integer, Integer>> roundsStat =
+                computeRoundStat(result.get(), 2 * 4 * 1000);
+        verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2);
+    }
+
+    @Test(timeout = 60000)
+    public void testVariableOnlyBoundedIteration() throws Exception {
+        // Create the test job
+        JobGraph jobGraph = createVariableOnlyJobGraph(4, 1000, false, 0, 
false, 1, result);
+        miniCluster.executeJobBlocking(jobGraph);
+
+        assertEquals(8001, result.get().size());
+
+        // Expected records is round * parallelism * numRecordsPerSource
+        Map<Integer, Tuple2<Integer, Integer>> roundsStat =
+                computeRoundStat(result.get(), 2 * 4 * 1000);
+        verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2);
+        assertEquals(OutputRecord.Event.TERMINATED, 
result.get().take().getEvent());
+    }
+
+    @Test(timeout = 60000)
+    public void testVariableAndConstantsUnboundedIteration() throws Exception {
+        // Create the test job
+        JobGraph jobGraph = createVariableAndConstantJobGraph(4, 1000, true, 
0, false, 1, result);
+        miniCluster.submitJob(jobGraph);
+
+        // Expected records is round * parallelism * numRecordsPerSource
+        Map<Integer, Tuple2<Integer, Integer>> roundsStat =
+                computeRoundStat(result.get(), 2 * 4 * 1000);
+        verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2);
+    }
+
+    @Test(timeout = 60000)
+    public void testVariableAndConstantBoundedIteration() throws Exception {
+        // Create the test job
+        JobGraph jobGraph = createVariableAndConstantJobGraph(4, 1000, false, 
0, false, 1, result);
+        miniCluster.executeJobBlocking(jobGraph);
+
+        assertEquals(8001, result.get().size());
+
+        // Expected records is round * parallelism * numRecordsPerSource
+        Map<Integer, Tuple2<Integer, Integer>> roundsStat =
+                computeRoundStat(result.get(), 2 * 4 * 1000);
+        verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2);
+        assertEquals(OutputRecord.Event.TERMINATED, 
result.get().take().getEvent());
+    }
+
+    static MiniClusterConfiguration createMiniClusterConfiguration(int numTm, 
int numSlot) {
+        Configuration configuration = new Configuration();
+        configuration.set(RestOptions.BIND_PORT, "18081-19091");
+        return new MiniClusterConfiguration.Builder()
+                .setConfiguration(configuration)
+                .setNumTaskManagers(numTm)
+                .setNumSlotsPerTaskManager(numSlot)
+                .build();
+    }
+
+    static JobGraph createVariableOnlyJobGraph(
+            int numSources,
+            int numRecordsPerSource,
+            boolean holdSource,
+            int period,
+            boolean sync,
+            int maxRound,
+            SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        DataStream<EpochRecord> source =
+                env.addSource(new SequenceSource(numRecordsPerSource, 
holdSource, period))
+                        .setParallelism(numSources);
+        DataStreamList outputs =
+                Iterations.iterateUnboundedStreams(
+                        DataStreamList.of(source),
+                        DataStreamList.of(),
+                        (variableStreams, dataStreams) -> {
+                            SingleOutputStreamOperator<EpochRecord> reducer =
+                                    variableStreams
+                                            .<EpochRecord>get(0)
+                                            .process(
+                                                    new 
ReduceAllRoundProcessFunction(
+                                                            sync, maxRound));
+                            return new IterationBodyResult(
+                                    DataStreamList.of(
+                                            reducer.map(new 
IncrementEpochMap())
+                                                    
.setParallelism(numSources)),
+                                    DataStreamList.of(
+                                            reducer.getSideOutput(
+                                                    new 
OutputTag<OutputRecord<Integer>>(
+                                                            "output") {})));
+                        });
+        outputs.<OutputRecord<Integer>>get(0).addSink(new CollectSink(result));
+
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    static JobGraph createVariableAndConstantJobGraph(
+            int numSources,
+            int numRecordsPerSource,
+            boolean holdSource,
+            int period,
+            boolean sync,
+            int maxRound,
+            SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        DataStream<EpochRecord> variableSource =
+                env.addSource(new 
DraftExecutionEnvironment.EmptySource<EpochRecord>() {})
+                        .setParallelism(numSources)
+                        .name("Variable");
+        DataStream<EpochRecord> constSource =
+                env.addSource(new SequenceSource(numRecordsPerSource, 
holdSource, period))
+                        .setParallelism(numSources)
+                        .name("Constant");
+        DataStreamList outputs =
+                Iterations.iterateUnboundedStreams(
+                        DataStreamList.of(variableSource),
+                        DataStreamList.of(constSource),
+                        (variableStreams, dataStreams) -> {
+                            SingleOutputStreamOperator<EpochRecord> reducer =
+                                    variableStreams
+                                            .<EpochRecord>get(0)
+                                            
.connect(dataStreams.<EpochRecord>get(0))
+                                            .process(
+                                                    new 
TwoInputReduceAllRoundProcessFunction(
+                                                            sync, maxRound));
+                            return new IterationBodyResult(
+                                    DataStreamList.of(
+                                            reducer.map(new 
IncrementEpochMap())
+                                                    
.setParallelism(numSources)),
+                                    DataStreamList.of(
+                                            reducer.getSideOutput(
+                                                    new 
OutputTag<OutputRecord<Integer>>(
+                                                            "output") {})));
+                        });
+        outputs.<OutputRecord<Integer>>get(0).addSink(new CollectSink(result));
+
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    static Map<Integer, Tuple2<Integer, Integer>> computeRoundStat(
+            BlockingQueue<OutputRecord<Integer>> result, int expectedRecords)
+            throws InterruptedException {
+        Map<Integer, Tuple2<Integer, Integer>> roundsStat = new HashMap<>();
+        for (int i = 0; i < expectedRecords; ++i) {
+            OutputRecord<Integer> next = result.take();
+            assertEquals(OutputRecord.Event.PROCESS_ELEMENT, next.getEvent());
+            Tuple2<Integer, Integer> state =
+                    roundsStat.computeIfAbsent(next.getRound(), ignored -> new 
Tuple2<>(0, 0));
+            state.f0++;
+            state.f1 = next.getValue();
+        }
+
+        return roundsStat;
+    }
+
+    static void verifyResult(
+            Map<Integer, Tuple2<Integer, Integer>> roundsStat,
+            int expectedRound,
+            int recordsEachRound,
+            int valueEachRound) {
+        assertEquals(expectedRound, roundsStat.size());
+        for (int i = 0; i < expectedRound; ++i) {
+            assertEquals(recordsEachRound, (int) roundsStat.get(i).f0);
+            assertEquals(valueEachRound, (int) roundsStat.get(i).f1);
+        }
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/CollectSink.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/CollectSink.java
new file mode 100644
index 0000000..c9e1bad
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/CollectSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import java.util.concurrent.BlockingQueue;
+
+/** Collects the results into the given queue. */
+public class CollectSink implements SinkFunction<OutputRecord<Integer>> {
+
+    private final SharedReference<BlockingQueue<OutputRecord<Integer>>> result;
+
+    public CollectSink(SharedReference<BlockingQueue<OutputRecord<Integer>>> 
result) {
+        this.result = result;
+    }
+
+    @Override
+    public void invoke(OutputRecord<Integer> value, Context context) throws 
Exception {
+        result.get().add(value);
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/EpochRecord.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/EpochRecord.java
new file mode 100644
index 0000000..b4825cb
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/EpochRecord.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+/**
+ * A value and its epoch. This a temporary implementation before we have 
determined how to notify
+ * operators about the records' epoch.
+ */
+public class EpochRecord {
+
+    private int epoch;
+
+    private int value;
+
+    public EpochRecord() {}
+
+    public EpochRecord(int epoch, int value) {
+        this.epoch = epoch;
+        this.value = value;
+    }
+
+    public int getEpoch() {
+        return epoch;
+    }
+
+    public void setEpoch(int epoch) {
+        this.epoch = epoch;
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public void setValue(int value) {
+        this.value = value;
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/IncrementEpochMap.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/IncrementEpochMap.java
new file mode 100644
index 0000000..a249a98
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/IncrementEpochMap.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/** Increments the epoch of the records. */
+public class IncrementEpochMap implements MapFunction<EpochRecord, 
EpochRecord> {
+
+    @Override
+    public EpochRecord map(EpochRecord record) throws Exception {
+        return new EpochRecord(record.getEpoch() + 1, record.getValue());
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/OutputRecord.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/OutputRecord.java
new file mode 100644
index 0000000..44db276
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/OutputRecord.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+/** The output record type. */
+public class OutputRecord<T> {
+
+    /** Possible events emitted by the iteration. */
+    public enum Event {
+        PROCESS_ELEMENT,
+        EPOCH_WATERMARK_INCREMENTED,
+        TERMINATED
+    }
+
+    private Event event;
+
+    private int round;
+
+    private T value;
+
+    public OutputRecord() {}
+
+    public OutputRecord(Event event, int round, T value) {
+        this.event = event;
+        this.round = round;
+        this.value = value;
+    }
+
+    public Event getEvent() {
+        return event;
+    }
+
+    public void setEvent(Event event) {
+        this.event = event;
+    }
+
+    public int getRound() {
+        return round;
+    }
+
+    public void setRound(int round) {
+        this.round = round;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "OutputRecord{" + "event=" + event + ", round=" + round + ", 
value=" + value + '}';
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
new file mode 100644
index 0000000..60c49fe
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.iteration.IterationListener;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * An operator that reduces the received numbers and emit the result into the 
output, and also emit
+ * the received numbers to the next operator.
+ */
+public class ReduceAllRoundProcessFunction extends 
ProcessFunction<EpochRecord, EpochRecord>
+        implements IterationListener<EpochRecord> {
+
+    private final boolean sync;
+
+    private final int maxRound;
+
+    private transient Map<Integer, Integer> sumByEpochs;
+
+    private transient List<EpochRecord> cachedRecords;
+
+    private transient OutputTag<OutputRecord<Integer>> outputTag;
+
+    public ReduceAllRoundProcessFunction(boolean sync, int maxRound) {
+        this.sync = sync;
+        this.maxRound = maxRound;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        sumByEpochs = new HashMap<>();
+        cachedRecords = new ArrayList<>();
+        outputTag = new OutputTag<OutputRecord<Integer>>("output") {};
+    }
+
+    @Override
+    public void processElement(
+            EpochRecord record,
+            ProcessFunction<EpochRecord, EpochRecord>.Context ctx,
+            Collector<EpochRecord> out)
+            throws Exception {
+        processRecord(record, ctx::output, out);
+    }
+
+    protected void processRecord(
+            EpochRecord record,
+            BiConsumer<OutputTag<OutputRecord<Integer>>, 
OutputRecord<Integer>> sideOutput,
+            Collector<EpochRecord> out) {
+        sumByEpochs.compute(
+                record.getEpoch(), (k, v) -> v == null ? record.getValue() : v 
+ record.getValue());
+
+        if (record.getEpoch() < maxRound) {
+            if (!sync) {
+                out.collect(record);
+            } else {
+                cachedRecords.add(record);
+            }
+        }
+
+        if (!sync) {
+            sideOutput.accept(
+                    outputTag,
+                    new OutputRecord<>(
+                            OutputRecord.Event.PROCESS_ELEMENT,
+                            record.getEpoch(),
+                            sumByEpochs.get(record.getEpoch())));
+        }
+    }
+
+    @Override
+    public void onEpochWatermarkIncremented(
+            int epochWatermark,
+            IterationListener.Context context,
+            Collector<EpochRecord> collector) {
+        if (sync) {
+            context.output(
+                    outputTag,
+                    new OutputRecord<>(
+                            OutputRecord.Event.EPOCH_WATERMARK_INCREMENTED,
+                            epochWatermark,
+                            sumByEpochs.get(epochWatermark)));
+            cachedRecords.forEach(collector::collect);
+            cachedRecords.clear();
+        }
+    }
+
+    @Override
+    public void onIterationTerminated(
+            IterationListener.Context context, Collector<EpochRecord> 
collector) {
+        context.output(outputTag, new 
OutputRecord<>(OutputRecord.Event.TERMINATED, -1, -1));
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/SequenceSource.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/SequenceSource.java
new file mode 100644
index 0000000..566e03d
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/SequenceSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+/** A source emitting the continuous int sequences. */
+public class SequenceSource extends RichParallelSourceFunction<EpochRecord> {
+
+    private final int maxValue;
+
+    private final boolean holdAfterMaxValue;
+
+    private final int period;
+
+    private volatile boolean canceled;
+
+    public SequenceSource(int maxValue, boolean holdAfterMaxValue, int period) 
{
+        this.maxValue = maxValue;
+        this.holdAfterMaxValue = holdAfterMaxValue;
+        this.period = period;
+    }
+
+    @Override
+    public void run(SourceContext<EpochRecord> ctx) throws Exception {
+        for (int i = 0; i < maxValue && !canceled; ++i) {
+            ctx.collect(new EpochRecord(0, i));
+            if (period > 0) {
+                Thread.sleep(period);
+            }
+        }
+
+        if (holdAfterMaxValue) {
+            while (!canceled) {
+                Thread.sleep(5000);
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        canceled = true;
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/TwoInputReduceAllRoundProcessFunction.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/TwoInputReduceAllRoundProcessFunction.java
new file mode 100644
index 0000000..648f1dc
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/TwoInputReduceAllRoundProcessFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.iteration.IterationListener;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * A proxy of {@link ReduceAllRoundProcessFunction} to two-inputs. It assumes 
the input 1 is empty.
+ */
+public class TwoInputReduceAllRoundProcessFunction
+        extends CoProcessFunction<EpochRecord, EpochRecord, EpochRecord>
+        implements IterationListener<EpochRecord> {
+
+    private final ReduceAllRoundProcessFunction reduceAllRoundProcessFunction;
+
+    public TwoInputReduceAllRoundProcessFunction(boolean sync, int maxRound) {
+        this.reduceAllRoundProcessFunction = new 
ReduceAllRoundProcessFunction(sync, maxRound);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        reduceAllRoundProcessFunction.open(parameters);
+    }
+
+    @Override
+    public void onEpochWatermarkIncremented(
+            int epochWatermark,
+            IterationListener.Context context,
+            Collector<EpochRecord> collector) {
+        reduceAllRoundProcessFunction.onEpochWatermarkIncremented(
+                epochWatermark, context, collector);
+    }
+
+    @Override
+    public void onIterationTerminated(
+            IterationListener.Context context, Collector<EpochRecord> 
collector) {
+        reduceAllRoundProcessFunction.onIterationTerminated(context, 
collector);
+    }
+
+    @Override
+    public void processElement1(
+            EpochRecord record,
+            CoProcessFunction<EpochRecord, EpochRecord, EpochRecord>.Context 
ctx,
+            Collector<EpochRecord> out)
+            throws Exception {
+
+        // Processing the following round of messages.
+        reduceAllRoundProcessFunction.processRecord(record, ctx::output, out);
+    }
+
+    @Override
+    public void processElement2(
+            EpochRecord record,
+            CoProcessFunction<EpochRecord, EpochRecord, EpochRecord>.Context 
ctx,
+            Collector<EpochRecord> out)
+            throws Exception {
+
+        // Processing the first round of messages.
+        reduceAllRoundProcessFunction.processRecord(record, ctx::output, out);
+    }
+}

Reply via email to