This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd1e1f306d9e8e94421a4a0587182a2e0283adf0
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Sep 30 09:41:31 2020 +0200

    [hotfix] Do not cache DataOutput in StatusWatermarkValve
    
    StatusWatermarkValve was caching a DataOutput passed in a constructor. From 
an architecture point of view it is a bug, because it ignores the DataOutput 
passed in the DataInput#emitNext(DataOutput). It made and implicit assumption 
that these two are always equal, which might not be true.
    
    In this PR I am passing the DataOutput as a parameter of 
StatusWatermarkValve methods.
---
 .../runtime/io/StreamMultipleInputProcessor.java   |   2 +-
 .../runtime/io/StreamTaskNetworkInput.java         |   4 +-
 .../runtime/io/StreamTwoInputProcessor.java        |   4 +-
 .../runtime/streamstatus/StatusWatermarkValve.java |  22 +--
 .../runtime/tasks/OneInputStreamTask.java          |   6 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |  12 +-
 .../streamstatus/StatusWatermarkValveTest.java     | 166 ++++++++++-----------
 7 files changed, 105 insertions(+), 111 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index 2a90062..6c6e31a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -123,7 +123,7 @@ public final class StreamMultipleInputProcessor implements 
StreamInputProcessor
                                                
checkpointedInputGates[networkInput.getInputGateIndex()],
                                                
networkInput.getTypeSerializer(),
                                                ioManager,
-                                               new 
StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels(),
 dataOutput),
+                                               new 
StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
                                                i));
                        }
                        else if (configuredInput instanceof SourceInputConfig) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 3d90b70..5084b8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -178,11 +178,11 @@ public final class StreamTaskNetworkInput<T> implements 
StreamTaskInput<T> {
                if (recordOrMark.isRecord()){
                        output.emitRecord(recordOrMark.asRecord());
                } else if (recordOrMark.isWatermark()) {
-                       
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
+                       
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel, 
output);
                } else if (recordOrMark.isLatencyMarker()) {
                        
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
                } else if (recordOrMark.isStreamStatus()) {
-                       
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), 
lastChannel);
+                       
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), 
lastChannel, output);
                } else {
                        throw new UnsupportedOperationException("Unknown type 
of StreamElement");
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 3f3f285..32f5791 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -118,13 +118,13 @@ public final class StreamTwoInputProcessor<IN1, IN2> 
implements StreamInputProce
                        checkpointedInputGates[0],
                        inputSerializer1,
                        ioManager,
-                       new 
StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels(), 
output1),
+                       new 
StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()),
                        0);
                this.input2 = new StreamTaskNetworkInput<>(
                        checkpointedInputGates[1],
                        inputSerializer2,
                        ioManager,
-                       new 
StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels(), 
output2),
+                       new 
StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()),
                        1);
 
                this.operatorChain = checkNotNull(operatorChain);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
index 43704d5..6e9ef98 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} 
and {@link StreamStatus} are propagated to
@@ -36,8 +35,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class StatusWatermarkValve {
 
-       private final DataOutput output;
-
        // 
------------------------------------------------------------------------
        //      Runtime state for watermark & stream status output determination
        // 
------------------------------------------------------------------------
@@ -58,9 +55,8 @@ public class StatusWatermarkValve {
         * Returns a new {@code StatusWatermarkValve}.
         *
         * @param numInputChannels the number of input channels that this valve 
will need to handle
-        * @param output the customized output handler for the valve
         */
-       public StatusWatermarkValve(int numInputChannels, DataOutput output) {
+       public StatusWatermarkValve(int numInputChannels) {
                checkArgument(numInputChannels > 0);
                this.channelStatuses = new InputChannelStatus[numInputChannels];
                for (int i = 0; i < numInputChannels; i++) {
@@ -70,8 +66,6 @@ public class StatusWatermarkValve {
                        channelStatuses[i].isWatermarkAligned = true;
                }
 
-               this.output = checkNotNull(output);
-
                this.lastOutputWatermark = Long.MIN_VALUE;
                this.lastOutputStreamStatus = StreamStatus.ACTIVE;
        }
@@ -83,7 +77,7 @@ public class StatusWatermarkValve {
         * @param watermark the watermark to feed to the valve
         * @param channelIndex the index of the channel that the fed watermark 
belongs to (index starting from 0)
         */
-       public void inputWatermark(Watermark watermark, int channelIndex) 
throws Exception {
+       public void inputWatermark(Watermark watermark, int channelIndex, 
DataOutput<?> output) throws Exception {
                // ignore the input watermark if its input channel, or all 
input channels are idle (i.e. overall the valve is idle).
                if (lastOutputStreamStatus.isActive() && 
channelStatuses[channelIndex].streamStatus.isActive()) {
                        long watermarkMillis = watermark.getTimestamp();
@@ -98,7 +92,7 @@ public class StatusWatermarkValve {
                                }
 
                                // now, attempt to find a new min watermark 
across all aligned channels
-                               
findAndOutputNewMinWatermarkAcrossAlignedChannels();
+                               
findAndOutputNewMinWatermarkAcrossAlignedChannels(output);
                        }
                }
        }
@@ -111,7 +105,7 @@ public class StatusWatermarkValve {
         * @param streamStatus the stream status to feed to the valve
         * @param channelIndex the index of the channel that the fed stream 
status belongs to (index starting from 0)
         */
-       public void inputStreamStatus(StreamStatus streamStatus, int 
channelIndex) throws Exception {
+       public void inputStreamStatus(StreamStatus streamStatus, int 
channelIndex, DataOutput<?> output) throws Exception {
                // only account for stream status inputs that will result in a 
status change for the input channel
                if (streamStatus.isIdle() && 
channelStatuses[channelIndex].streamStatus.isActive()) {
                        // handle active -> idle toggle for the input channel
@@ -130,7 +124,7 @@ public class StatusWatermarkValve {
                                // the min watermark as channels individually 
become IDLE, here we only need to perform the flush
                                // if the watermark of the last active channel 
that just became idle is the current min watermark.
                                if (channelStatuses[channelIndex].watermark == 
lastOutputWatermark) {
-                                       
findAndOutputMaxWatermarkAcrossAllChannels();
+                                       
findAndOutputMaxWatermarkAcrossAllChannels(output);
                                }
 
                                lastOutputStreamStatus = StreamStatus.IDLE;
@@ -139,7 +133,7 @@ public class StatusWatermarkValve {
                                // if the watermark of the channel that just 
became idle equals the last output
                                // watermark (the previous overall min 
watermark), we may be able to find a new
                                // min watermark from the remaining aligned 
channels
-                               
findAndOutputNewMinWatermarkAcrossAlignedChannels();
+                               
findAndOutputNewMinWatermarkAcrossAlignedChannels(output);
                        }
                } else if (streamStatus.isActive() && 
channelStatuses[channelIndex].streamStatus.isIdle()) {
                        // handle idle -> active toggle for the input channel
@@ -160,7 +154,7 @@ public class StatusWatermarkValve {
                }
        }
 
-       private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws 
Exception {
+       private void 
findAndOutputNewMinWatermarkAcrossAlignedChannels(DataOutput<?> output) throws 
Exception {
                long newMinWatermark = Long.MAX_VALUE;
                boolean hasAlignedChannels = false;
 
@@ -180,7 +174,7 @@ public class StatusWatermarkValve {
                }
        }
 
-       private void findAndOutputMaxWatermarkAcrossAllChannels() throws 
Exception {
+       private void findAndOutputMaxWatermarkAcrossAllChannels(DataOutput<?> 
output) throws Exception {
                long maxWatermark = Long.MIN_VALUE;
 
                for (InputChannelStatus channelStatus : channelStatuses) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 35094d3..0a99eee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -88,7 +88,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                        CheckpointedInputGate inputGate = 
createCheckpointedInputGate();
                        Counter numRecordsIn = 
setupNumRecordsInCounter(mainOperator);
                        DataOutput<IN> output = createDataOutput(numRecordsIn);
-                       StreamTaskInput<IN> input = createTaskInput(inputGate, 
output);
+                       StreamTaskInput<IN> input = createTaskInput(inputGate);
                        
getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn);
                        inputProcessor = new StreamOneInputProcessor<>(
                                input,
@@ -121,9 +121,9 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                        numRecordsIn);
        }
 
-       private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate 
inputGate, DataOutput<IN> output) {
+       private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate 
inputGate) {
                int numberOfInputChannels = 
inputGate.getNumberOfInputChannels();
-               StatusWatermarkValve statusWatermarkValve = new 
StatusWatermarkValve(numberOfInputChannels, output);
+               StatusWatermarkValve statusWatermarkValve = new 
StatusWatermarkValve(numberOfInputChannels);
 
                TypeSerializer<IN> inSerializer = 
configuration.getTypeSerializerIn1(getUserCodeClassLoader());
                return new StreamTaskNetworkInput<>(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 48f833d..ff9fac7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -88,7 +88,7 @@ public class StreamTaskNetworkInputTest {
                List<BufferOrEvent> buffers = 
Collections.singletonList(createDataBuffer());
 
                VerifyRecordsDataOutput output = new 
VerifyRecordsDataOutput<>();
-               StreamTaskNetworkInput input = 
createStreamTaskNetworkInput(buffers, output);
+               StreamTaskNetworkInput input = 
createStreamTaskNetworkInput(buffers);
 
                assertHasNextElement(input, output);
                assertHasNextElement(input, output);
@@ -108,7 +108,7 @@ public class StreamTaskNetworkInputTest {
                buffers.add(createDataBuffer());
 
                VerifyRecordsDataOutput output = new 
VerifyRecordsDataOutput<>();
-               StreamTaskNetworkInput input = 
createStreamTaskNetworkInput(buffers, output);
+               StreamTaskNetworkInput input = 
createStreamTaskNetworkInput(buffers);
 
                assertHasNextElement(input, output);
                assertEquals(0, output.getNumberOfEmittedRecords());
@@ -140,7 +140,7 @@ public class StreamTaskNetworkInputTest {
                                        inputGate.getInputGate()),
                                new SyncMailboxExecutor()),
                        inSerializer,
-                       new StatusWatermarkValve(numInputChannels, output),
+                       new StatusWatermarkValve(numInputChannels),
                        0,
                        deserializers);
 
@@ -184,7 +184,7 @@ public class StreamTaskNetworkInputTest {
                                new CheckpointBarrierTracker(1, new 
DummyCheckpointInvokable()),
                                new SyncMailboxExecutor()),
                        inSerializer,
-                       new StatusWatermarkValve(1, output),
+                       new StatusWatermarkValve(1),
                        0,
                        deserializers);
 
@@ -206,7 +206,7 @@ public class StreamTaskNetworkInputTest {
                return new BufferOrEvent(bufferConsumer.build(), new 
InputChannelInfo(0, 0));
        }
 
-       private StreamTaskNetworkInput 
createStreamTaskNetworkInput(List<BufferOrEvent> buffers, DataOutput output) {
+       private StreamTaskNetworkInput 
createStreamTaskNetworkInput(List<BufferOrEvent> buffers) {
                return new StreamTaskNetworkInput<>(
                        new CheckpointedInputGate(
                                new MockInputGate(1, buffers, false),
@@ -214,7 +214,7 @@ public class StreamTaskNetworkInputTest {
                                new SyncMailboxExecutor()),
                        LongSerializer.INSTANCE,
                        ioManager,
-                       new StatusWatermarkValve(1, output),
+                       new StatusWatermarkValve(1),
                        0);
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index ff20a16..fdb6d7d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -53,13 +53,13 @@ public class StatusWatermarkValveTest {
        @Test
        public void testSingleInputIncreasingWatermarks() throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(1, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(1);
 
-               valve.inputWatermark(new Watermark(0), 0);
+               valve.inputWatermark(new Watermark(0), 0, valveOutput);
                assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(25), 0);
+               valve.inputWatermark(new Watermark(25), 0, valveOutput);
                assertEquals(new Watermark(25), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -70,15 +70,15 @@ public class StatusWatermarkValveTest {
        @Test
        public void testSingleInputDecreasingWatermarksYieldsNoOutput() throws 
Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(1, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(1);
 
-               valve.inputWatermark(new Watermark(25), 0);
+               valve.inputWatermark(new Watermark(25), 0, valveOutput);
                assertEquals(new Watermark(25), 
valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(18), 0);
+               valve.inputWatermark(new Watermark(18), 0, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(42), 0);
+               valve.inputWatermark(new Watermark(42), 0, valveOutput);
                assertEquals(new Watermark(42), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -90,19 +90,19 @@ public class StatusWatermarkValveTest {
        @Test
        public void testSingleInputStreamStatusToggling() throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(1, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(1);
 
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput);
                // this also implicitly verifies that input channels start as 
ACTIVE
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
                assertEquals(StreamStatus.IDLE, 
valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput);
                assertEquals(StreamStatus.ACTIVE, 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -113,24 +113,24 @@ public class StatusWatermarkValveTest {
        @Test
        public void testSingleInputWatermarksIntactDuringIdleness() throws 
Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(1, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(1);
 
-               valve.inputWatermark(new Watermark(25), 0);
+               valve.inputWatermark(new Watermark(25), 0, valveOutput);
                assertEquals(new Watermark(25), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
                assertEquals(StreamStatus.IDLE, 
valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(50), 0);
+               valve.inputWatermark(new Watermark(50), 0, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
                assertEquals(25, valve.getInputChannelStatus(0).watermark);
 
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput);
                assertEquals(StreamStatus.ACTIVE, 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(50), 0);
+               valve.inputWatermark(new Watermark(50), 0, valveOutput);
                assertEquals(new Watermark(50), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -141,14 +141,14 @@ public class StatusWatermarkValveTest {
        @Test
        public void 
testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() throws 
Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(0), 0);
-               valve.inputWatermark(new Watermark(0), 1);
+               valve.inputWatermark(new Watermark(0), 0, valveOutput);
+               valve.inputWatermark(new Watermark(0), 1, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // now, all channels have watermarks
-               valve.inputWatermark(new Watermark(0), 2);
+               valve.inputWatermark(new Watermark(0), 2, valveOutput);
                assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -160,29 +160,29 @@ public class StatusWatermarkValveTest {
        @Test
        public void testMultipleInputIncreasingWatermarks() throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(0), 0);
-               valve.inputWatermark(new Watermark(0), 1);
-               valve.inputWatermark(new Watermark(0), 2);
+               valve.inputWatermark(new Watermark(0), 0, valveOutput);
+               valve.inputWatermark(new Watermark(0), 1, valveOutput);
+               valve.inputWatermark(new Watermark(0), 2, valveOutput);
                assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(12), 0);
-               valve.inputWatermark(new Watermark(8), 2);
-               valve.inputWatermark(new Watermark(10), 2);
+               valve.inputWatermark(new Watermark(12), 0, valveOutput);
+               valve.inputWatermark(new Watermark(8), 2, valveOutput);
+               valve.inputWatermark(new Watermark(10), 2, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(15), 1);
+               valve.inputWatermark(new Watermark(15), 1, valveOutput);
                // lowest watermark across all channels is now channel 2, with 
watermark @ 10
                assertEquals(new Watermark(10), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(17), 2);
+               valve.inputWatermark(new Watermark(17), 2, valveOutput);
                // lowest watermark across all channels is now channel 0, with 
watermark @ 12
                assertEquals(new Watermark(12), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(20), 0);
+               valve.inputWatermark(new Watermark(20), 0, valveOutput);
                // lowest watermark across all channels is now channel 1, with 
watermark @ 15
                assertEquals(new Watermark(15), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
@@ -194,16 +194,16 @@ public class StatusWatermarkValveTest {
        @Test
        public void testMultipleInputDecreasingWatermarksYieldsNoOutput() 
throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(25), 0);
-               valve.inputWatermark(new Watermark(10), 1);
-               valve.inputWatermark(new Watermark(17), 2);
+               valve.inputWatermark(new Watermark(25), 0, valveOutput);
+               valve.inputWatermark(new Watermark(10), 1, valveOutput);
+               valve.inputWatermark(new Watermark(17), 2, valveOutput);
                assertEquals(new Watermark(10), 
valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(12), 0);
-               valve.inputWatermark(new Watermark(8), 1);
-               valve.inputWatermark(new Watermark(15), 2);
+               valve.inputWatermark(new Watermark(12), 0, valveOutput);
+               valve.inputWatermark(new Watermark(8), 1, valveOutput);
+               valve.inputWatermark(new Watermark(15), 2, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
 
@@ -214,29 +214,29 @@ public class StatusWatermarkValveTest {
        @Test
        public void testMultipleInputStreamStatusToggling() throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(2, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(2);
 
                // this also implicitly verifies that all input channels start 
as active
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 1, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // now, all channels are IDLE
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
                assertEquals(StreamStatus.IDLE, 
valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
-               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
+               valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // as soon as at least one input becomes active again, the 
ACTIVE marker should be forwarded
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 1, valveOutput);
                assertEquals(StreamStatus.ACTIVE, 
valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput);
                // already back to ACTIVE, should yield no output
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -248,23 +248,23 @@ public class StatusWatermarkValveTest {
        @Test
        public void 
testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() throws Exception 
{
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(15), 0);
-               valve.inputWatermark(new Watermark(10), 1);
+               valve.inputWatermark(new Watermark(15), 0, valveOutput);
+               valve.inputWatermark(new Watermark(10), 1, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput);
                // min watermark should be computed from remaining ACTIVE 
channels
                assertEquals(new Watermark(10), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(18), 1);
+               valve.inputWatermark(new Watermark(18), 1, valveOutput);
                // now, min watermark should be 15 from channel #0
                assertEquals(new Watermark(15), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputWatermark(new Watermark(20), 0);
+               valve.inputWatermark(new Watermark(20), 0, valveOutput);
                // now, min watermark should be 18 from channel #1
                assertEquals(new Watermark(18), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
@@ -277,18 +277,18 @@ public class StatusWatermarkValveTest {
        @Test
        public void 
testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() throws 
Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(25), 0);
-               valve.inputWatermark(new Watermark(10), 1);
-               valve.inputWatermark(new Watermark(17), 2);
+               valve.inputWatermark(new Watermark(25), 0, valveOutput);
+               valve.inputWatermark(new Watermark(10), 1, valveOutput);
+               valve.inputWatermark(new Watermark(17), 2, valveOutput);
                assertEquals(new Watermark(10), 
valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput);
                // only channel 0 & 2 is ACTIVE; 17 is the overall min 
watermark now
                assertEquals(new Watermark(17), 
valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput);
                // only channel 0 is ACTIVE; 25 is the overall min watermark now
                assertEquals(new Watermark(25), 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
@@ -305,7 +305,7 @@ public class StatusWatermarkValveTest {
        @Test
        public void 
testMultipleInputFlushMaxWatermarkAndStreamStatusOnceAllInputsBecomeIdle() 
throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
                // 
-------------------------------------------------------------------------------------------
                // Setup valve for test case:
@@ -315,9 +315,9 @@ public class StatusWatermarkValveTest {
                //  Min Watermark across channels = 3 (from channel #3)
                // 
-------------------------------------------------------------------------------------------
 
-               valve.inputWatermark(new Watermark(10), 0);
-               valve.inputWatermark(new Watermark(5), 1);
-               valve.inputWatermark(new Watermark(3), 2);
+               valve.inputWatermark(new Watermark(10), 0, valveOutput);
+               valve.inputWatermark(new Watermark(5), 1, valveOutput);
+               valve.inputWatermark(new Watermark(3), 2, valveOutput);
                assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
 
                // 
-------------------------------------------------------------------------------------------
@@ -326,11 +326,11 @@ public class StatusWatermarkValveTest {
                //   |-> (nothing emitted)        |-> (nothing emitted)        
|-> Emit Watermark(10) & IDLE
                // 
-------------------------------------------------------------------------------------------
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
-               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
+               valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput);
                assertEquals(new Watermark(10), 
valveOutput.popLastSeenOutput());
                assertEquals(StreamStatus.IDLE, 
valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
@@ -343,35 +343,35 @@ public class StatusWatermarkValveTest {
        @Test
        public void testMultipleInputWatermarkRealignmentAfterResumeActive() 
throws Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(10), 0);
-               valve.inputWatermark(new Watermark(7), 1);
-               valve.inputWatermark(new Watermark(3), 2);
+               valve.inputWatermark(new Watermark(10), 0, valveOutput);
+               valve.inputWatermark(new Watermark(7), 1, valveOutput);
+               valve.inputWatermark(new Watermark(3), 2, valveOutput);
                assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
-               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput);
                assertEquals(new Watermark(7), valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // let channel 2 become active again; since the min watermark 
has now advanced to 7,
                // channel 2 should have been marked as non-aligned.
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 2, valveOutput);
                assertFalse(valve.getInputChannelStatus(2).isWatermarkAligned);
 
                // during the realignment process, watermarks should still be 
accepted by channel 2 (but shouldn't yield new watermarks)
-               valve.inputWatermark(new Watermark(5), 2);
+               valve.inputWatermark(new Watermark(5), 2, valveOutput);
                assertEquals(5, valve.getInputChannelStatus(2).watermark);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // let channel 2 catch up with the min watermark; now should be 
realigned
-               valve.inputWatermark(new Watermark(9), 2);
+               valve.inputWatermark(new Watermark(9), 2, valveOutput);
                assertTrue(valve.getInputChannelStatus(2).isWatermarkAligned);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // check that realigned inputs is now taken into account for 
watermark advancement
-               valve.inputWatermark(new Watermark(12), 1);
+               valve.inputWatermark(new Watermark(12), 1, valveOutput);
                assertEquals(new Watermark(9), valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
        }
@@ -384,23 +384,23 @@ public class StatusWatermarkValveTest {
        @Test
        public void testNoOutputWhenAllActiveChannelsAreUnaligned() throws 
Exception {
                StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
-               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+               StatusWatermarkValve valve = new StatusWatermarkValve(3);
 
-               valve.inputWatermark(new Watermark(10), 0);
-               valve.inputWatermark(new Watermark(7), 1);
+               valve.inputWatermark(new Watermark(10), 0, valveOutput);
+               valve.inputWatermark(new Watermark(7), 1, valveOutput);
 
                // make channel 2 ACTIVE, it is now in "catch up" mode 
(unaligned watermark)
-               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput);
                assertEquals(new Watermark(7), valveOutput.popLastSeenOutput());
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // make channel 2 ACTIVE again, it is still unaligned
-               valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 2, valveOutput);
                assertEquals(null, valveOutput.popLastSeenOutput());
 
                // make channel 0 and 1 IDLE, now channel 2 is the only ACTIVE 
channel but it's unaligned
-               valve.inputStreamStatus(StreamStatus.IDLE, 0);
-               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput);
+               valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput);
 
                // we should not see any output
                assertEquals(null, valveOutput.popLastSeenOutput());

Reply via email to