This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
commit a1800b983b8851cc9aecd1fe115d90d88f3c6a58 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Mon Sep 27 00:36:36 2021 +0800 [FLINK-24646][iteration] Add progress tracker for aligning epoch watermarks --- flink-ml-iteration/pom.xml | 1 + .../OperatorEpochWatermarkTracker.java | 150 ++++++++++++ .../OperatorEpochWatermarkTrackerFactory.java | 71 ++++++ .../OperatorEpochWatermarkTrackerListener.java | 32 +++ .../OperatorEpochWatermarkTrackerFactoryTest.java | 271 +++++++++++++++++++++ .../OperatorEpochWatermarkTrackerTest.java | 91 +++++++ 6 files changed, 616 insertions(+) diff --git a/flink-ml-iteration/pom.xml b/flink-ml-iteration/pom.xml index a82ea4f..d2cdb36 100644 --- a/flink-ml-iteration/pom.xml +++ b/flink-ml-iteration/pom.xml @@ -104,6 +104,7 @@ under the License. <scope>test</scope> </dependency> + <!-- We depends on StreamTaskMailboxTestHarnessBuilder and it requires this dependency --> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTracker.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTracker.java new file mode 100644 index 0000000..0c72246 --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTracker.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.progresstrack; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Tracks the epoch watermark from each input. Once the minimum epoch watermark changed, it would + * notify the listener. + */ +public class OperatorEpochWatermarkTracker { + + private final OperatorEpochWatermarkTrackerListener progressTrackerListener; + + private final List<InputStatus> inputStatuses; + + private final LowerBoundMaintainer allInputsLowerBound; + + public OperatorEpochWatermarkTracker( + int[] numberOfChannels, OperatorEpochWatermarkTrackerListener progressTrackerListener) { + checkState(numberOfChannels != null && numberOfChannels.length >= 1); + this.progressTrackerListener = checkNotNull(progressTrackerListener); + + this.inputStatuses = new ArrayList<>(numberOfChannels.length); + for (int numberOfChannel : numberOfChannels) { + inputStatuses.add(new InputStatus(numberOfChannel)); + } + + this.allInputsLowerBound = new LowerBoundMaintainer(numberOfChannels.length); + } + + public void onEpochWatermark(int inputIndex, String sender, int epochWatermark) + throws IOException { + InputStatus inputStatus = inputStatuses.get(inputIndex); + inputStatus.onUpdate(sender, epochWatermark); + + if (inputStatus.getInputLowerBound() > allInputsLowerBound.getValue(inputIndex)) { + int oldLowerBound = allInputsLowerBound.getLowerBound(); + allInputsLowerBound.updateValue(inputIndex, inputStatus.getInputLowerBound()); + if (allInputsLowerBound.getLowerBound() > oldLowerBound) { + progressTrackerListener.onEpochWatermarkIncrement( + allInputsLowerBound.getLowerBound()); + } + } + } + + @VisibleForTesting + int[] getNumberOfInputs() { + return inputStatuses.stream() + .mapToInt(inputStatus -> inputStatus.numberOfChannels) + .toArray(); + } + + private static class InputStatus { + private final int numberOfChannels; + private final Map<String, Integer> senderIndices; + private final LowerBoundMaintainer allChannelsLowerBound; + + public InputStatus(int numberOfChannels) { + this.numberOfChannels = numberOfChannels; + this.senderIndices = new HashMap<>(numberOfChannels); + this.allChannelsLowerBound = new LowerBoundMaintainer(numberOfChannels); + } + + public void onUpdate(String sender, int epochWatermark) { + int index = senderIndices.computeIfAbsent(sender, k -> senderIndices.size()); + checkState(index < numberOfChannels); + + allChannelsLowerBound.updateValue(index, epochWatermark); + } + + public int getInputLowerBound() { + return allChannelsLowerBound.getLowerBound(); + } + } + + private static class LowerBoundMaintainer { + + private final int[] values; + + private int lowerBound; + + public LowerBoundMaintainer(int numberOfValues) { + this.values = new int[numberOfValues]; + Arrays.fill(values, Integer.MIN_VALUE); + lowerBound = Integer.MIN_VALUE; + } + + public int getLowerBound() { + return lowerBound; + } + + public int getValue(int channel) { + return values[channel]; + } + + public void updateValue(int channel, int value) { + checkState( + value > values[channel], + String.format( + "The channel %d received an outdated value %d, which currently is %d", + channel, value, values[channel])); + if (value > values[channel]) { + long oldValue = values[channel]; + values[channel] = value; + + if (oldValue == lowerBound) { + lowerBound = calculateLowerBound(); + } + } + } + + private int calculateLowerBound() { + int newLowerBound = values[0]; + for (int i = 1; i < values.length; ++i) { + if (values[i] < newLowerBound) { + newLowerBound = values[i]; + } + } + + return newLowerBound; + } + } +} diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactory.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactory.java new file mode 100644 index 0000000..d15beb4 --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.progresstrack; + +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * The factory of {@link OperatorEpochWatermarkTracker}. It analyzes the inputs of an operator and + * create the corresponding progress tracker. + */ +public class OperatorEpochWatermarkTrackerFactory { + + public static OperatorEpochWatermarkTracker create( + StreamConfig streamConfig, + StreamTask<?, ?> containingTask, + OperatorEpochWatermarkTrackerListener progressTrackerListener) { + + int[] numberOfChannels; + if (!streamConfig.isChainStart()) { + numberOfChannels = new int[] {1}; + } else { + InputGate[] inputGates = containingTask.getEnvironment().getAllInputGates(); + List<StreamEdge> inEdges = + streamConfig.getInPhysicalEdges(containingTask.getUserCodeClassLoader()); + + // Mapping the edge type (input number) into a continuous sequence start from 0. + // Currently for one-input operator, the type number is 0; for two-inputs and + // multiple-inputs, the type number is from 1 to N. We want to map them to [0, N - 1] + // uniformly. + TreeSet<Integer> edgeTypes = new TreeSet<>(); + inEdges.forEach(edge -> edgeTypes.add(edge.getTypeNumber())); + + Map<Integer, Integer> edgeTypeToIndices = new HashMap<>(); + for (int edgeType : edgeTypes) { + edgeTypeToIndices.put(edgeType, edgeTypeToIndices.size()); + } + + numberOfChannels = new int[edgeTypeToIndices.size()]; + for (int i = 0; i < inEdges.size(); ++i) { + numberOfChannels[edgeTypeToIndices.get(inEdges.get(i).getTypeNumber())] += + inputGates[i].getNumberOfInputChannels(); + } + } + + return new OperatorEpochWatermarkTracker(numberOfChannels, progressTrackerListener); + } +} diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerListener.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerListener.java new file mode 100644 index 0000000..20884f2 --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.progresstrack; + +import java.io.IOException; + +/** The listener of alignment. */ +public interface OperatorEpochWatermarkTrackerListener { + + /** + * Notifies a new round is aligned to a new epoch watermark. + * + * @param epochWatermark The new epoch watermark. + */ + void onEpochWatermarkIncrement(int epochWatermark) throws IOException; +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java new file mode 100644 index 0000000..1eb493a --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.progresstrack; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractInput; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test the {@link OperatorEpochWatermarkTracker} is created correctly according to the topology. + */ +public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger { + + private static OperatorEpochWatermarkTracker lastProgressTracker; + + @Test + public void testChainedOperator() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.addSource(new EmptySource()) + .transform( + "tracking", + BasicTypeInfo.INT_TYPE_INFO, + new OneInputProgressTrackingOperator(ChainingStrategy.ALWAYS)); + env.execute(); + + checkNumberOfInput(new int[] {1}); + } + + @Test + public void testOneInputOperator() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(new EmptySource()) + .setParallelism(4) + .transform( + "tracking", + BasicTypeInfo.INT_TYPE_INFO, + new OneInputProgressTrackingOperator(ChainingStrategy.NEVER)) + .setParallelism(1); + env.execute(); + + checkNumberOfInput(new int[] {4}); + } + + @Test + public void testUnionedOneInput() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(new EmptySource()) + .setParallelism(4) + .union(env.addSource(new EmptySource()).setParallelism(3)) + .union(env.addSource(new EmptySource()).setParallelism(2)) + .transform( + "tracking", + BasicTypeInfo.INT_TYPE_INFO, + new OneInputProgressTrackingOperator(ChainingStrategy.NEVER)) + .setParallelism(1); + env.execute(); + + checkNumberOfInput(new int[] {9}); + } + + @Test + public void testTwoInputOperator() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(new EmptySource()) + .setParallelism(4) + .connect(env.addSource(new EmptySource()).setParallelism(3)) + .transform( + "tracking", + BasicTypeInfo.INT_TYPE_INFO, + new TwoInputProgressTrackingOperator()) + .setParallelism(1); + env.execute(); + + checkNumberOfInput(new int[] {4, 3}); + } + + @Test + public void testUnionedTwoInputOperator() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(new EmptySource()) + .setParallelism(4) + .union(env.addSource(new EmptySource()).setParallelism(2)) + .connect(env.addSource(new EmptySource()).setParallelism(3)) + .transform( + "tracking", + BasicTypeInfo.INT_TYPE_INFO, + new TwoInputProgressTrackingOperator()) + .setParallelism(1); + env.execute(); + + checkNumberOfInput(new int[] {6, 3}); + } + + @Test + public void testMultipleInputOperator() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Integer> first = + env.addSource(new EmptySource()) + .setParallelism(4) + .union(env.addSource(new EmptySource()).setParallelism(2)); + DataStream<Integer> second = + env.addSource(new EmptySource()) + .setParallelism(2) + .union(env.addSource(new EmptySource()).setParallelism(3)); + DataStream<Integer> third = env.addSource(new EmptySource()).setParallelism(10); + + MultipleInputTransformation<Integer> multipleInputTransformation = + new MultipleInputTransformation<>( + "tracking", + new MultipleInputProgressTrackingOperatorFactory(3), + BasicTypeInfo.INT_TYPE_INFO, + 1); + multipleInputTransformation.addInput(first.getTransformation()); + multipleInputTransformation.addInput(second.getTransformation()); + multipleInputTransformation.addInput(third.getTransformation()); + env.addOperator(multipleInputTransformation); + env.execute(); + + checkNumberOfInput(new int[] {6, 5, 10}); + } + + private void checkNumberOfInput(int[] numberOfInputs) { + assertNotNull(lastProgressTracker); + assertArrayEquals(numberOfInputs, lastProgressTracker.getNumberOfInputs()); + } + + private static class EmptySource implements ParallelSourceFunction<Integer> { + + @Override + public void run(SourceContext<Integer> ctx) throws Exception {} + + @Override + public void cancel() {} + } + + private static class OneInputProgressTrackingOperator extends AbstractStreamOperator<Integer> + implements OneInputStreamOperator<Integer, Integer> { + + public OneInputProgressTrackingOperator(ChainingStrategy chainingStrategy) { + this.chainingStrategy = chainingStrategy; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<Integer>> output) { + super.setup(containingTask, config, output); + lastProgressTracker = + OperatorEpochWatermarkTrackerFactory.create( + config, containingTask, (ignored) -> {}); + } + + @Override + public void processElement(StreamRecord<Integer> element) throws Exception {} + } + + private static class TwoInputProgressTrackingOperator extends AbstractStreamOperator<Integer> + implements TwoInputStreamOperator<Integer, Integer, Integer> { + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<Integer>> output) { + super.setup(containingTask, config, output); + lastProgressTracker = + OperatorEpochWatermarkTrackerFactory.create( + config, containingTask, (ignored) -> {}); + } + + @Override + public void processElement1(StreamRecord<Integer> element) throws Exception {} + + @Override + public void processElement2(StreamRecord<Integer> element) throws Exception {} + } + + private static class MultipleInputProgressTrackingOperator + extends AbstractStreamOperatorV2<Integer> + implements MultipleInputStreamOperator<Integer> { + + private final int numberOfInputs; + + public MultipleInputProgressTrackingOperator( + StreamOperatorParameters<Integer> parameters, int numberOfInputs) { + super(parameters, numberOfInputs); + this.numberOfInputs = numberOfInputs; + lastProgressTracker = + OperatorEpochWatermarkTrackerFactory.create( + config, parameters.getContainingTask(), (ignored) -> {}); + } + + @Override + public List<Input> getInputs() { + List<Input> inputs = new ArrayList<>(); + for (int i = 0; i < numberOfInputs; ++i) { + inputs.add( + new AbstractInput(this, i + 1) { + @Override + public void processElement(StreamRecord element) throws Exception {} + }); + } + return inputs; + } + } + + private static class MultipleInputProgressTrackingOperatorFactory + extends AbstractStreamOperatorFactory<Integer> { + + private final int numberOfInputs; + + public MultipleInputProgressTrackingOperatorFactory(int numberOfInputs) { + this.numberOfInputs = numberOfInputs; + } + + @Override + public <T extends StreamOperator<Integer>> T createStreamOperator( + StreamOperatorParameters<Integer> parameters) { + return (T) new MultipleInputProgressTrackingOperator(parameters, numberOfInputs); + } + + @Override + public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { + return MultipleInputProgressTrackingOperator.class; + } + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerTest.java new file mode 100644 index 0000000..73a95b1 --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.progresstrack; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** Tests the logic of {@link OperatorEpochWatermarkTracker}. */ +public class OperatorEpochWatermarkTrackerTest extends TestLogger { + + @Test + public void testEpochWatermarkAlignment() throws IOException { + RecordingProgressListener recordingProgressListener = new RecordingProgressListener(); + int[] numberOfChannels = new int[] {2, 3}; + OperatorEpochWatermarkTracker progressTracker = + new OperatorEpochWatermarkTracker(numberOfChannels, recordingProgressListener); + + testOnEpochWatermark( + new int[] {0, 0, 0, 0, 1}, + progressTracker, + recordingProgressListener, + new int[] {0, 1, 1, 0, 1}, + new String[] {"0-0", "1-0", "1-1", "0-1", "1-2"}, + 2); + assertEquals(Collections.singletonList(2), recordingProgressListener.notifications); + + recordingProgressListener.reset(); + testOnEpochWatermark( + new int[] {0, 0, 0, 0, 1}, + progressTracker, + recordingProgressListener, + new int[] {0, 0, 1, 1, 1}, + new String[] {"0-0", "0-1", "1-0", "1-1", "1-2"}, + 3); + assertEquals(Collections.singletonList(3), recordingProgressListener.notifications); + } + + private void testOnEpochWatermark( + int[] expectedNumNotifications, + OperatorEpochWatermarkTracker tracker, + RecordingProgressListener recordingProgressListener, + int[] inputIndices, + String[] senders, + int incrementedEpochWatermark) + throws IOException { + for (int i = 0; i < expectedNumNotifications.length; ++i) { + tracker.onEpochWatermark(inputIndices[i], senders[i], incrementedEpochWatermark); + assertEquals( + expectedNumNotifications[i], recordingProgressListener.notifications.size()); + } + } + + private static class RecordingProgressListener + implements OperatorEpochWatermarkTrackerListener { + + final List<Integer> notifications = new ArrayList<>(); + + @Override + public void onEpochWatermarkIncrement(int epochWatermark) { + notifications.add(epochWatermark); + } + + public void reset() { + notifications.clear(); + } + } +}