This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit a1800b983b8851cc9aecd1fe115d90d88f3c6a58
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Mon Sep 27 00:36:36 2021 +0800

    [FLINK-24646][iteration] Add progress tracker for aligning epoch watermarks
---
 flink-ml-iteration/pom.xml                         |   1 +
 .../OperatorEpochWatermarkTracker.java             | 150 ++++++++++++
 .../OperatorEpochWatermarkTrackerFactory.java      |  71 ++++++
 .../OperatorEpochWatermarkTrackerListener.java     |  32 +++
 .../OperatorEpochWatermarkTrackerFactoryTest.java  | 271 +++++++++++++++++++++
 .../OperatorEpochWatermarkTrackerTest.java         |  91 +++++++
 6 files changed, 616 insertions(+)

diff --git a/flink-ml-iteration/pom.xml b/flink-ml-iteration/pom.xml
index a82ea4f..d2cdb36 100644
--- a/flink-ml-iteration/pom.xml
+++ b/flink-ml-iteration/pom.xml
@@ -104,6 +104,7 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <!-- We depends on StreamTaskMailboxTestHarnessBuilder and it requires 
this dependency -->
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTracker.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTracker.java
new file mode 100644
index 0000000..0c72246
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTracker.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.progresstrack;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Tracks the epoch watermark from each input. Once the minimum epoch 
watermark changed, it would
+ * notify the listener.
+ */
+public class OperatorEpochWatermarkTracker {
+
+    private final OperatorEpochWatermarkTrackerListener 
progressTrackerListener;
+
+    private final List<InputStatus> inputStatuses;
+
+    private final LowerBoundMaintainer allInputsLowerBound;
+
+    public OperatorEpochWatermarkTracker(
+            int[] numberOfChannels, OperatorEpochWatermarkTrackerListener 
progressTrackerListener) {
+        checkState(numberOfChannels != null && numberOfChannels.length >= 1);
+        this.progressTrackerListener = checkNotNull(progressTrackerListener);
+
+        this.inputStatuses = new ArrayList<>(numberOfChannels.length);
+        for (int numberOfChannel : numberOfChannels) {
+            inputStatuses.add(new InputStatus(numberOfChannel));
+        }
+
+        this.allInputsLowerBound = new 
LowerBoundMaintainer(numberOfChannels.length);
+    }
+
+    public void onEpochWatermark(int inputIndex, String sender, int 
epochWatermark)
+            throws IOException {
+        InputStatus inputStatus = inputStatuses.get(inputIndex);
+        inputStatus.onUpdate(sender, epochWatermark);
+
+        if (inputStatus.getInputLowerBound() > 
allInputsLowerBound.getValue(inputIndex)) {
+            int oldLowerBound = allInputsLowerBound.getLowerBound();
+            allInputsLowerBound.updateValue(inputIndex, 
inputStatus.getInputLowerBound());
+            if (allInputsLowerBound.getLowerBound() > oldLowerBound) {
+                progressTrackerListener.onEpochWatermarkIncrement(
+                        allInputsLowerBound.getLowerBound());
+            }
+        }
+    }
+
+    @VisibleForTesting
+    int[] getNumberOfInputs() {
+        return inputStatuses.stream()
+                .mapToInt(inputStatus -> inputStatus.numberOfChannels)
+                .toArray();
+    }
+
+    private static class InputStatus {
+        private final int numberOfChannels;
+        private final Map<String, Integer> senderIndices;
+        private final LowerBoundMaintainer allChannelsLowerBound;
+
+        public InputStatus(int numberOfChannels) {
+            this.numberOfChannels = numberOfChannels;
+            this.senderIndices = new HashMap<>(numberOfChannels);
+            this.allChannelsLowerBound = new 
LowerBoundMaintainer(numberOfChannels);
+        }
+
+        public void onUpdate(String sender, int epochWatermark) {
+            int index = senderIndices.computeIfAbsent(sender, k -> 
senderIndices.size());
+            checkState(index < numberOfChannels);
+
+            allChannelsLowerBound.updateValue(index, epochWatermark);
+        }
+
+        public int getInputLowerBound() {
+            return allChannelsLowerBound.getLowerBound();
+        }
+    }
+
+    private static class LowerBoundMaintainer {
+
+        private final int[] values;
+
+        private int lowerBound;
+
+        public LowerBoundMaintainer(int numberOfValues) {
+            this.values = new int[numberOfValues];
+            Arrays.fill(values, Integer.MIN_VALUE);
+            lowerBound = Integer.MIN_VALUE;
+        }
+
+        public int getLowerBound() {
+            return lowerBound;
+        }
+
+        public int getValue(int channel) {
+            return values[channel];
+        }
+
+        public void updateValue(int channel, int value) {
+            checkState(
+                    value > values[channel],
+                    String.format(
+                            "The channel %d received an outdated value %d, 
which currently is %d",
+                            channel, value, values[channel]));
+            if (value > values[channel]) {
+                long oldValue = values[channel];
+                values[channel] = value;
+
+                if (oldValue == lowerBound) {
+                    lowerBound = calculateLowerBound();
+                }
+            }
+        }
+
+        private int calculateLowerBound() {
+            int newLowerBound = values[0];
+            for (int i = 1; i < values.length; ++i) {
+                if (values[i] < newLowerBound) {
+                    newLowerBound = values[i];
+                }
+            }
+
+            return newLowerBound;
+        }
+    }
+}
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactory.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactory.java
new file mode 100644
index 0000000..d15beb4
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.progresstrack;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+/**
+ * The factory of {@link OperatorEpochWatermarkTracker}. It analyzes the 
inputs of an operator and
+ * create the corresponding progress tracker.
+ */
+public class OperatorEpochWatermarkTrackerFactory {
+
+    public static OperatorEpochWatermarkTracker create(
+            StreamConfig streamConfig,
+            StreamTask<?, ?> containingTask,
+            OperatorEpochWatermarkTrackerListener progressTrackerListener) {
+
+        int[] numberOfChannels;
+        if (!streamConfig.isChainStart()) {
+            numberOfChannels = new int[] {1};
+        } else {
+            InputGate[] inputGates = 
containingTask.getEnvironment().getAllInputGates();
+            List<StreamEdge> inEdges =
+                    
streamConfig.getInPhysicalEdges(containingTask.getUserCodeClassLoader());
+
+            // Mapping the edge type (input number) into a continuous sequence 
start from 0.
+            // Currently for one-input operator, the type number is 0; for 
two-inputs and
+            // multiple-inputs, the type number is from 1 to N. We want to map 
them to [0, N - 1]
+            // uniformly.
+            TreeSet<Integer> edgeTypes = new TreeSet<>();
+            inEdges.forEach(edge -> edgeTypes.add(edge.getTypeNumber()));
+
+            Map<Integer, Integer> edgeTypeToIndices = new HashMap<>();
+            for (int edgeType : edgeTypes) {
+                edgeTypeToIndices.put(edgeType, edgeTypeToIndices.size());
+            }
+
+            numberOfChannels = new int[edgeTypeToIndices.size()];
+            for (int i = 0; i < inEdges.size(); ++i) {
+                
numberOfChannels[edgeTypeToIndices.get(inEdges.get(i).getTypeNumber())] +=
+                        inputGates[i].getNumberOfInputChannels();
+            }
+        }
+
+        return new OperatorEpochWatermarkTracker(numberOfChannels, 
progressTrackerListener);
+    }
+}
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerListener.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerListener.java
new file mode 100644
index 0000000..20884f2
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.progresstrack;
+
+import java.io.IOException;
+
+/** The listener of alignment. */
+public interface OperatorEpochWatermarkTrackerListener {
+
+    /**
+     * Notifies a new round is aligned to a new epoch watermark.
+     *
+     * @param epochWatermark The new epoch watermark.
+     */
+    void onEpochWatermarkIncrement(int epochWatermark) throws IOException;
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
new file mode 100644
index 0000000..1eb493a
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.progresstrack;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test the {@link OperatorEpochWatermarkTracker} is created correctly 
according to the topology.
+ */
+public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
+
+    private static OperatorEpochWatermarkTracker lastProgressTracker;
+
+    @Test
+    public void testChainedOperator() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.addSource(new EmptySource())
+                .transform(
+                        "tracking",
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        new 
OneInputProgressTrackingOperator(ChainingStrategy.ALWAYS));
+        env.execute();
+
+        checkNumberOfInput(new int[] {1});
+    }
+
+    @Test
+    public void testOneInputOperator() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.addSource(new EmptySource())
+                .setParallelism(4)
+                .transform(
+                        "tracking",
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        new 
OneInputProgressTrackingOperator(ChainingStrategy.NEVER))
+                .setParallelism(1);
+        env.execute();
+
+        checkNumberOfInput(new int[] {4});
+    }
+
+    @Test
+    public void testUnionedOneInput() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.addSource(new EmptySource())
+                .setParallelism(4)
+                .union(env.addSource(new EmptySource()).setParallelism(3))
+                .union(env.addSource(new EmptySource()).setParallelism(2))
+                .transform(
+                        "tracking",
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        new 
OneInputProgressTrackingOperator(ChainingStrategy.NEVER))
+                .setParallelism(1);
+        env.execute();
+
+        checkNumberOfInput(new int[] {9});
+    }
+
+    @Test
+    public void testTwoInputOperator() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.addSource(new EmptySource())
+                .setParallelism(4)
+                .connect(env.addSource(new EmptySource()).setParallelism(3))
+                .transform(
+                        "tracking",
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        new TwoInputProgressTrackingOperator())
+                .setParallelism(1);
+        env.execute();
+
+        checkNumberOfInput(new int[] {4, 3});
+    }
+
+    @Test
+    public void testUnionedTwoInputOperator() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.addSource(new EmptySource())
+                .setParallelism(4)
+                .union(env.addSource(new EmptySource()).setParallelism(2))
+                .connect(env.addSource(new EmptySource()).setParallelism(3))
+                .transform(
+                        "tracking",
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        new TwoInputProgressTrackingOperator())
+                .setParallelism(1);
+        env.execute();
+
+        checkNumberOfInput(new int[] {6, 3});
+    }
+
+    @Test
+    public void testMultipleInputOperator() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Integer> first =
+                env.addSource(new EmptySource())
+                        .setParallelism(4)
+                        .union(env.addSource(new 
EmptySource()).setParallelism(2));
+        DataStream<Integer> second =
+                env.addSource(new EmptySource())
+                        .setParallelism(2)
+                        .union(env.addSource(new 
EmptySource()).setParallelism(3));
+        DataStream<Integer> third = env.addSource(new 
EmptySource()).setParallelism(10);
+
+        MultipleInputTransformation<Integer> multipleInputTransformation =
+                new MultipleInputTransformation<>(
+                        "tracking",
+                        new MultipleInputProgressTrackingOperatorFactory(3),
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        1);
+        multipleInputTransformation.addInput(first.getTransformation());
+        multipleInputTransformation.addInput(second.getTransformation());
+        multipleInputTransformation.addInput(third.getTransformation());
+        env.addOperator(multipleInputTransformation);
+        env.execute();
+
+        checkNumberOfInput(new int[] {6, 5, 10});
+    }
+
+    private void checkNumberOfInput(int[] numberOfInputs) {
+        assertNotNull(lastProgressTracker);
+        assertArrayEquals(numberOfInputs, 
lastProgressTracker.getNumberOfInputs());
+    }
+
+    private static class EmptySource implements 
ParallelSourceFunction<Integer> {
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {}
+
+        @Override
+        public void cancel() {}
+    }
+
+    private static class OneInputProgressTrackingOperator extends 
AbstractStreamOperator<Integer>
+            implements OneInputStreamOperator<Integer, Integer> {
+
+        public OneInputProgressTrackingOperator(ChainingStrategy 
chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        @Override
+        public void setup(
+                StreamTask<?, ?> containingTask,
+                StreamConfig config,
+                Output<StreamRecord<Integer>> output) {
+            super.setup(containingTask, config, output);
+            lastProgressTracker =
+                    OperatorEpochWatermarkTrackerFactory.create(
+                            config, containingTask, (ignored) -> {});
+        }
+
+        @Override
+        public void processElement(StreamRecord<Integer> element) throws 
Exception {}
+    }
+
+    private static class TwoInputProgressTrackingOperator extends 
AbstractStreamOperator<Integer>
+            implements TwoInputStreamOperator<Integer, Integer, Integer> {
+
+        @Override
+        public void setup(
+                StreamTask<?, ?> containingTask,
+                StreamConfig config,
+                Output<StreamRecord<Integer>> output) {
+            super.setup(containingTask, config, output);
+            lastProgressTracker =
+                    OperatorEpochWatermarkTrackerFactory.create(
+                            config, containingTask, (ignored) -> {});
+        }
+
+        @Override
+        public void processElement1(StreamRecord<Integer> element) throws 
Exception {}
+
+        @Override
+        public void processElement2(StreamRecord<Integer> element) throws 
Exception {}
+    }
+
+    private static class MultipleInputProgressTrackingOperator
+            extends AbstractStreamOperatorV2<Integer>
+            implements MultipleInputStreamOperator<Integer> {
+
+        private final int numberOfInputs;
+
+        public MultipleInputProgressTrackingOperator(
+                StreamOperatorParameters<Integer> parameters, int 
numberOfInputs) {
+            super(parameters, numberOfInputs);
+            this.numberOfInputs = numberOfInputs;
+            lastProgressTracker =
+                    OperatorEpochWatermarkTrackerFactory.create(
+                            config, parameters.getContainingTask(), (ignored) 
-> {});
+        }
+
+        @Override
+        public List<Input> getInputs() {
+            List<Input> inputs = new ArrayList<>();
+            for (int i = 0; i < numberOfInputs; ++i) {
+                inputs.add(
+                        new AbstractInput(this, i + 1) {
+                            @Override
+                            public void processElement(StreamRecord element) 
throws Exception {}
+                        });
+            }
+            return inputs;
+        }
+    }
+
+    private static class MultipleInputProgressTrackingOperatorFactory
+            extends AbstractStreamOperatorFactory<Integer> {
+
+        private final int numberOfInputs;
+
+        public MultipleInputProgressTrackingOperatorFactory(int 
numberOfInputs) {
+            this.numberOfInputs = numberOfInputs;
+        }
+
+        @Override
+        public <T extends StreamOperator<Integer>> T createStreamOperator(
+                StreamOperatorParameters<Integer> parameters) {
+            return (T) new MultipleInputProgressTrackingOperator(parameters, 
numberOfInputs);
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return MultipleInputProgressTrackingOperator.class;
+        }
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerTest.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerTest.java
new file mode 100644
index 0000000..73a95b1
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.progresstrack;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the logic of {@link OperatorEpochWatermarkTracker}. */
+public class OperatorEpochWatermarkTrackerTest extends TestLogger {
+
+    @Test
+    public void testEpochWatermarkAlignment() throws IOException {
+        RecordingProgressListener recordingProgressListener = new 
RecordingProgressListener();
+        int[] numberOfChannels = new int[] {2, 3};
+        OperatorEpochWatermarkTracker progressTracker =
+                new OperatorEpochWatermarkTracker(numberOfChannels, 
recordingProgressListener);
+
+        testOnEpochWatermark(
+                new int[] {0, 0, 0, 0, 1},
+                progressTracker,
+                recordingProgressListener,
+                new int[] {0, 1, 1, 0, 1},
+                new String[] {"0-0", "1-0", "1-1", "0-1", "1-2"},
+                2);
+        assertEquals(Collections.singletonList(2), 
recordingProgressListener.notifications);
+
+        recordingProgressListener.reset();
+        testOnEpochWatermark(
+                new int[] {0, 0, 0, 0, 1},
+                progressTracker,
+                recordingProgressListener,
+                new int[] {0, 0, 1, 1, 1},
+                new String[] {"0-0", "0-1", "1-0", "1-1", "1-2"},
+                3);
+        assertEquals(Collections.singletonList(3), 
recordingProgressListener.notifications);
+    }
+
+    private void testOnEpochWatermark(
+            int[] expectedNumNotifications,
+            OperatorEpochWatermarkTracker tracker,
+            RecordingProgressListener recordingProgressListener,
+            int[] inputIndices,
+            String[] senders,
+            int incrementedEpochWatermark)
+            throws IOException {
+        for (int i = 0; i < expectedNumNotifications.length; ++i) {
+            tracker.onEpochWatermark(inputIndices[i], senders[i], 
incrementedEpochWatermark);
+            assertEquals(
+                    expectedNumNotifications[i], 
recordingProgressListener.notifications.size());
+        }
+    }
+
+    private static class RecordingProgressListener
+            implements OperatorEpochWatermarkTrackerListener {
+
+        final List<Integer> notifications = new ArrayList<>();
+
+        @Override
+        public void onEpochWatermarkIncrement(int epochWatermark) {
+            notifications.add(epochWatermark);
+        }
+
+        public void reset() {
+            notifications.clear();
+        }
+    }
+}

Reply via email to