This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd1e1f306d9e8e94421a4a0587182a2e0283adf0 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Sep 30 09:41:31 2020 +0200 [hotfix] Do not cache DataOutput in StatusWatermarkValve StatusWatermarkValve was caching a DataOutput passed in a constructor. From an architecture point of view it is a bug, because it ignores the DataOutput passed in the DataInput#emitNext(DataOutput). It made and implicit assumption that these two are always equal, which might not be true. In this PR I am passing the DataOutput as a parameter of StatusWatermarkValve methods. --- .../runtime/io/StreamMultipleInputProcessor.java | 2 +- .../runtime/io/StreamTaskNetworkInput.java | 4 +- .../runtime/io/StreamTwoInputProcessor.java | 4 +- .../runtime/streamstatus/StatusWatermarkValve.java | 22 +-- .../runtime/tasks/OneInputStreamTask.java | 6 +- .../runtime/io/StreamTaskNetworkInputTest.java | 12 +- .../streamstatus/StatusWatermarkValveTest.java | 166 ++++++++++----------- 7 files changed, 105 insertions(+), 111 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java index 2a90062..6c6e31a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java @@ -123,7 +123,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor checkpointedInputGates[networkInput.getInputGateIndex()], networkInput.getTypeSerializer(), ioManager, - new StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels(), dataOutput), + new StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()), i)); } else if (configuredInput instanceof SourceInputConfig) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java index 3d90b70..5084b8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java @@ -178,11 +178,11 @@ public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> { if (recordOrMark.isRecord()){ output.emitRecord(recordOrMark.asRecord()); } else if (recordOrMark.isWatermark()) { - statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel); + statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel, output); } else if (recordOrMark.isLatencyMarker()) { output.emitLatencyMarker(recordOrMark.asLatencyMarker()); } else if (recordOrMark.isStreamStatus()) { - statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel); + statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel, output); } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 3f3f285..32f5791 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -118,13 +118,13 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce checkpointedInputGates[0], inputSerializer1, ioManager, - new StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels(), output1), + new StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()), 0); this.input2 = new StreamTaskNetworkInput<>( checkpointedInputGates[1], inputSerializer2, ioManager, - new StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels(), output2), + new StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()), 1); this.operatorChain = checkNotNull(operatorChain); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java index 43704d5..6e9ef98 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java @@ -25,7 +25,6 @@ import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; import org.apache.flink.util.Preconditions; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} and {@link StreamStatus} are propagated to @@ -36,8 +35,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StatusWatermarkValve { - private final DataOutput output; - // ------------------------------------------------------------------------ // Runtime state for watermark & stream status output determination // ------------------------------------------------------------------------ @@ -58,9 +55,8 @@ public class StatusWatermarkValve { * Returns a new {@code StatusWatermarkValve}. * * @param numInputChannels the number of input channels that this valve will need to handle - * @param output the customized output handler for the valve */ - public StatusWatermarkValve(int numInputChannels, DataOutput output) { + public StatusWatermarkValve(int numInputChannels) { checkArgument(numInputChannels > 0); this.channelStatuses = new InputChannelStatus[numInputChannels]; for (int i = 0; i < numInputChannels; i++) { @@ -70,8 +66,6 @@ public class StatusWatermarkValve { channelStatuses[i].isWatermarkAligned = true; } - this.output = checkNotNull(output); - this.lastOutputWatermark = Long.MIN_VALUE; this.lastOutputStreamStatus = StreamStatus.ACTIVE; } @@ -83,7 +77,7 @@ public class StatusWatermarkValve { * @param watermark the watermark to feed to the valve * @param channelIndex the index of the channel that the fed watermark belongs to (index starting from 0) */ - public void inputWatermark(Watermark watermark, int channelIndex) throws Exception { + public void inputWatermark(Watermark watermark, int channelIndex, DataOutput<?> output) throws Exception { // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle). if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) { long watermarkMillis = watermark.getTimestamp(); @@ -98,7 +92,7 @@ public class StatusWatermarkValve { } // now, attempt to find a new min watermark across all aligned channels - findAndOutputNewMinWatermarkAcrossAlignedChannels(); + findAndOutputNewMinWatermarkAcrossAlignedChannels(output); } } } @@ -111,7 +105,7 @@ public class StatusWatermarkValve { * @param streamStatus the stream status to feed to the valve * @param channelIndex the index of the channel that the fed stream status belongs to (index starting from 0) */ - public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) throws Exception { + public void inputStreamStatus(StreamStatus streamStatus, int channelIndex, DataOutput<?> output) throws Exception { // only account for stream status inputs that will result in a status change for the input channel if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) { // handle active -> idle toggle for the input channel @@ -130,7 +124,7 @@ public class StatusWatermarkValve { // the min watermark as channels individually become IDLE, here we only need to perform the flush // if the watermark of the last active channel that just became idle is the current min watermark. if (channelStatuses[channelIndex].watermark == lastOutputWatermark) { - findAndOutputMaxWatermarkAcrossAllChannels(); + findAndOutputMaxWatermarkAcrossAllChannels(output); } lastOutputStreamStatus = StreamStatus.IDLE; @@ -139,7 +133,7 @@ public class StatusWatermarkValve { // if the watermark of the channel that just became idle equals the last output // watermark (the previous overall min watermark), we may be able to find a new // min watermark from the remaining aligned channels - findAndOutputNewMinWatermarkAcrossAlignedChannels(); + findAndOutputNewMinWatermarkAcrossAlignedChannels(output); } } else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) { // handle idle -> active toggle for the input channel @@ -160,7 +154,7 @@ public class StatusWatermarkValve { } } - private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception { + private void findAndOutputNewMinWatermarkAcrossAlignedChannels(DataOutput<?> output) throws Exception { long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; @@ -180,7 +174,7 @@ public class StatusWatermarkValve { } } - private void findAndOutputMaxWatermarkAcrossAllChannels() throws Exception { + private void findAndOutputMaxWatermarkAcrossAllChannels(DataOutput<?> output) throws Exception { long maxWatermark = Long.MIN_VALUE; for (InputChannelStatus channelStatus : channelStatuses) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 35094d3..0a99eee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -88,7 +88,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO CheckpointedInputGate inputGate = createCheckpointedInputGate(); Counter numRecordsIn = setupNumRecordsInCounter(mainOperator); DataOutput<IN> output = createDataOutput(numRecordsIn); - StreamTaskInput<IN> input = createTaskInput(inputGate, output); + StreamTaskInput<IN> input = createTaskInput(inputGate); getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn); inputProcessor = new StreamOneInputProcessor<>( input, @@ -121,9 +121,9 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO numRecordsIn); } - private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate, DataOutput<IN> output) { + private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate) { int numberOfInputChannels = inputGate.getNumberOfInputChannels(); - StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels, output); + StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels); TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); return new StreamTaskNetworkInput<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java index 48f833d..ff9fac7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java @@ -88,7 +88,7 @@ public class StreamTaskNetworkInputTest { List<BufferOrEvent> buffers = Collections.singletonList(createDataBuffer()); VerifyRecordsDataOutput output = new VerifyRecordsDataOutput<>(); - StreamTaskNetworkInput input = createStreamTaskNetworkInput(buffers, output); + StreamTaskNetworkInput input = createStreamTaskNetworkInput(buffers); assertHasNextElement(input, output); assertHasNextElement(input, output); @@ -108,7 +108,7 @@ public class StreamTaskNetworkInputTest { buffers.add(createDataBuffer()); VerifyRecordsDataOutput output = new VerifyRecordsDataOutput<>(); - StreamTaskNetworkInput input = createStreamTaskNetworkInput(buffers, output); + StreamTaskNetworkInput input = createStreamTaskNetworkInput(buffers); assertHasNextElement(input, output); assertEquals(0, output.getNumberOfEmittedRecords()); @@ -140,7 +140,7 @@ public class StreamTaskNetworkInputTest { inputGate.getInputGate()), new SyncMailboxExecutor()), inSerializer, - new StatusWatermarkValve(numInputChannels, output), + new StatusWatermarkValve(numInputChannels), 0, deserializers); @@ -184,7 +184,7 @@ public class StreamTaskNetworkInputTest { new CheckpointBarrierTracker(1, new DummyCheckpointInvokable()), new SyncMailboxExecutor()), inSerializer, - new StatusWatermarkValve(1, output), + new StatusWatermarkValve(1), 0, deserializers); @@ -206,7 +206,7 @@ public class StreamTaskNetworkInputTest { return new BufferOrEvent(bufferConsumer.build(), new InputChannelInfo(0, 0)); } - private StreamTaskNetworkInput createStreamTaskNetworkInput(List<BufferOrEvent> buffers, DataOutput output) { + private StreamTaskNetworkInput createStreamTaskNetworkInput(List<BufferOrEvent> buffers) { return new StreamTaskNetworkInput<>( new CheckpointedInputGate( new MockInputGate(1, buffers, false), @@ -214,7 +214,7 @@ public class StreamTaskNetworkInputTest { new SyncMailboxExecutor()), LongSerializer.INSTANCE, ioManager, - new StatusWatermarkValve(1, output), + new StatusWatermarkValve(1), 0); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java index ff20a16..fdb6d7d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java @@ -53,13 +53,13 @@ public class StatusWatermarkValveTest { @Test public void testSingleInputIncreasingWatermarks() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(1); - valve.inputWatermark(new Watermark(0), 0); + valve.inputWatermark(new Watermark(0), 0, valveOutput); assertEquals(new Watermark(0), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(25), 0); + valve.inputWatermark(new Watermark(25), 0, valveOutput); assertEquals(new Watermark(25), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -70,15 +70,15 @@ public class StatusWatermarkValveTest { @Test public void testSingleInputDecreasingWatermarksYieldsNoOutput() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(1); - valve.inputWatermark(new Watermark(25), 0); + valve.inputWatermark(new Watermark(25), 0, valveOutput); assertEquals(new Watermark(25), valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(18), 0); + valve.inputWatermark(new Watermark(18), 0, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(42), 0); + valve.inputWatermark(new Watermark(42), 0, valveOutput); assertEquals(new Watermark(42), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -90,19 +90,19 @@ public class StatusWatermarkValveTest { @Test public void testSingleInputStreamStatusToggling() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(1); - valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput); // this also implicitly verifies that input channels start as ACTIVE assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 0); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 0); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput); assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -113,24 +113,24 @@ public class StatusWatermarkValveTest { @Test public void testSingleInputWatermarksIntactDuringIdleness() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(1); - valve.inputWatermark(new Watermark(25), 0); + valve.inputWatermark(new Watermark(25), 0, valveOutput); assertEquals(new Watermark(25), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 0); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(50), 0); + valve.inputWatermark(new Watermark(50), 0, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); assertEquals(25, valve.getInputChannelStatus(0).watermark); - valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput); assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(50), 0); + valve.inputWatermark(new Watermark(50), 0, valveOutput); assertEquals(new Watermark(50), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -141,14 +141,14 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(0), 0); - valve.inputWatermark(new Watermark(0), 1); + valve.inputWatermark(new Watermark(0), 0, valveOutput); + valve.inputWatermark(new Watermark(0), 1, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); // now, all channels have watermarks - valve.inputWatermark(new Watermark(0), 2); + valve.inputWatermark(new Watermark(0), 2, valveOutput); assertEquals(new Watermark(0), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -160,29 +160,29 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputIncreasingWatermarks() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(0), 0); - valve.inputWatermark(new Watermark(0), 1); - valve.inputWatermark(new Watermark(0), 2); + valve.inputWatermark(new Watermark(0), 0, valveOutput); + valve.inputWatermark(new Watermark(0), 1, valveOutput); + valve.inputWatermark(new Watermark(0), 2, valveOutput); assertEquals(new Watermark(0), valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(12), 0); - valve.inputWatermark(new Watermark(8), 2); - valve.inputWatermark(new Watermark(10), 2); + valve.inputWatermark(new Watermark(12), 0, valveOutput); + valve.inputWatermark(new Watermark(8), 2, valveOutput); + valve.inputWatermark(new Watermark(10), 2, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(15), 1); + valve.inputWatermark(new Watermark(15), 1, valveOutput); // lowest watermark across all channels is now channel 2, with watermark @ 10 assertEquals(new Watermark(10), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(17), 2); + valve.inputWatermark(new Watermark(17), 2, valveOutput); // lowest watermark across all channels is now channel 0, with watermark @ 12 assertEquals(new Watermark(12), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(20), 0); + valve.inputWatermark(new Watermark(20), 0, valveOutput); // lowest watermark across all channels is now channel 1, with watermark @ 15 assertEquals(new Watermark(15), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); @@ -194,16 +194,16 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputDecreasingWatermarksYieldsNoOutput() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(25), 0); - valve.inputWatermark(new Watermark(10), 1); - valve.inputWatermark(new Watermark(17), 2); + valve.inputWatermark(new Watermark(25), 0, valveOutput); + valve.inputWatermark(new Watermark(10), 1, valveOutput); + valve.inputWatermark(new Watermark(17), 2, valveOutput); assertEquals(new Watermark(10), valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(12), 0); - valve.inputWatermark(new Watermark(8), 1); - valve.inputWatermark(new Watermark(15), 2); + valve.inputWatermark(new Watermark(12), 0, valveOutput); + valve.inputWatermark(new Watermark(8), 1, valveOutput); + valve.inputWatermark(new Watermark(15), 2, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -214,29 +214,29 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputStreamStatusToggling() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(2, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(2); // this also implicitly verifies that all input channels start as active - valve.inputStreamStatus(StreamStatus.ACTIVE, 0); - valve.inputStreamStatus(StreamStatus.ACTIVE, 1); + valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput); + valve.inputStreamStatus(StreamStatus.ACTIVE, 1, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 1); + valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); // now, all channels are IDLE - valve.inputStreamStatus(StreamStatus.IDLE, 0); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 0); - valve.inputStreamStatus(StreamStatus.IDLE, 1); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); + valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); // as soon as at least one input becomes active again, the ACTIVE marker should be forwarded - valve.inputStreamStatus(StreamStatus.ACTIVE, 1); + valve.inputStreamStatus(StreamStatus.ACTIVE, 1, valveOutput); assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + valve.inputStreamStatus(StreamStatus.ACTIVE, 0, valveOutput); // already back to ACTIVE, should yield no output assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -248,23 +248,23 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(15), 0); - valve.inputWatermark(new Watermark(10), 1); + valve.inputWatermark(new Watermark(15), 0, valveOutput); + valve.inputWatermark(new Watermark(10), 1, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 2); + valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput); // min watermark should be computed from remaining ACTIVE channels assertEquals(new Watermark(10), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(18), 1); + valve.inputWatermark(new Watermark(18), 1, valveOutput); // now, min watermark should be 15 from channel #0 assertEquals(new Watermark(15), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputWatermark(new Watermark(20), 0); + valve.inputWatermark(new Watermark(20), 0, valveOutput); // now, min watermark should be 18 from channel #1 assertEquals(new Watermark(18), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); @@ -277,18 +277,18 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(25), 0); - valve.inputWatermark(new Watermark(10), 1); - valve.inputWatermark(new Watermark(17), 2); + valve.inputWatermark(new Watermark(25), 0, valveOutput); + valve.inputWatermark(new Watermark(10), 1, valveOutput); + valve.inputWatermark(new Watermark(17), 2, valveOutput); assertEquals(new Watermark(10), valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 1); + valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput); // only channel 0 & 2 is ACTIVE; 17 is the overall min watermark now assertEquals(new Watermark(17), valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 2); + valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput); // only channel 0 is ACTIVE; 25 is the overall min watermark now assertEquals(new Watermark(25), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); @@ -305,7 +305,7 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputFlushMaxWatermarkAndStreamStatusOnceAllInputsBecomeIdle() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); // ------------------------------------------------------------------------------------------- // Setup valve for test case: @@ -315,9 +315,9 @@ public class StatusWatermarkValveTest { // Min Watermark across channels = 3 (from channel #3) // ------------------------------------------------------------------------------------------- - valve.inputWatermark(new Watermark(10), 0); - valve.inputWatermark(new Watermark(5), 1); - valve.inputWatermark(new Watermark(3), 2); + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(5), 1, valveOutput); + valve.inputWatermark(new Watermark(3), 2, valveOutput); assertEquals(new Watermark(3), valveOutput.popLastSeenOutput()); // ------------------------------------------------------------------------------------------- @@ -326,11 +326,11 @@ public class StatusWatermarkValveTest { // |-> (nothing emitted) |-> (nothing emitted) |-> Emit Watermark(10) & IDLE // ------------------------------------------------------------------------------------------- - valve.inputStreamStatus(StreamStatus.IDLE, 0); - valve.inputStreamStatus(StreamStatus.IDLE, 1); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); + valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 2); + valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput); assertEquals(new Watermark(10), valveOutput.popLastSeenOutput()); assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); @@ -343,35 +343,35 @@ public class StatusWatermarkValveTest { @Test public void testMultipleInputWatermarkRealignmentAfterResumeActive() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(10), 0); - valve.inputWatermark(new Watermark(7), 1); - valve.inputWatermark(new Watermark(3), 2); + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(7), 1, valveOutput); + valve.inputWatermark(new Watermark(3), 2, valveOutput); assertEquals(new Watermark(3), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); - valve.inputStreamStatus(StreamStatus.IDLE, 2); + valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput); assertEquals(new Watermark(7), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); // let channel 2 become active again; since the min watermark has now advanced to 7, // channel 2 should have been marked as non-aligned. - valve.inputStreamStatus(StreamStatus.ACTIVE, 2); + valve.inputStreamStatus(StreamStatus.ACTIVE, 2, valveOutput); assertFalse(valve.getInputChannelStatus(2).isWatermarkAligned); // during the realignment process, watermarks should still be accepted by channel 2 (but shouldn't yield new watermarks) - valve.inputWatermark(new Watermark(5), 2); + valve.inputWatermark(new Watermark(5), 2, valveOutput); assertEquals(5, valve.getInputChannelStatus(2).watermark); assertEquals(null, valveOutput.popLastSeenOutput()); // let channel 2 catch up with the min watermark; now should be realigned - valve.inputWatermark(new Watermark(9), 2); + valve.inputWatermark(new Watermark(9), 2, valveOutput); assertTrue(valve.getInputChannelStatus(2).isWatermarkAligned); assertEquals(null, valveOutput.popLastSeenOutput()); // check that realigned inputs is now taken into account for watermark advancement - valve.inputWatermark(new Watermark(12), 1); + valve.inputWatermark(new Watermark(12), 1, valveOutput); assertEquals(new Watermark(9), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); } @@ -384,23 +384,23 @@ public class StatusWatermarkValveTest { @Test public void testNoOutputWhenAllActiveChannelsAreUnaligned() throws Exception { StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); - StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + StatusWatermarkValve valve = new StatusWatermarkValve(3); - valve.inputWatermark(new Watermark(10), 0); - valve.inputWatermark(new Watermark(7), 1); + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(7), 1, valveOutput); // make channel 2 ACTIVE, it is now in "catch up" mode (unaligned watermark) - valve.inputStreamStatus(StreamStatus.IDLE, 2); + valve.inputStreamStatus(StreamStatus.IDLE, 2, valveOutput); assertEquals(new Watermark(7), valveOutput.popLastSeenOutput()); assertEquals(null, valveOutput.popLastSeenOutput()); // make channel 2 ACTIVE again, it is still unaligned - valve.inputStreamStatus(StreamStatus.ACTIVE, 2); + valve.inputStreamStatus(StreamStatus.ACTIVE, 2, valveOutput); assertEquals(null, valveOutput.popLastSeenOutput()); // make channel 0 and 1 IDLE, now channel 2 is the only ACTIVE channel but it's unaligned - valve.inputStreamStatus(StreamStatus.IDLE, 0); - valve.inputStreamStatus(StreamStatus.IDLE, 1); + valve.inputStreamStatus(StreamStatus.IDLE, 0, valveOutput); + valve.inputStreamStatus(StreamStatus.IDLE, 1, valveOutput); // we should not see any output assertEquals(null, valveOutput.popLastSeenOutput());