Repository: flink
Updated Branches:
  refs/heads/master 5be27a23d -> 010f66547


[FLINK-8731] Replaced mockito with custom mock in TestInputChannel

This closes #6338


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/010f6654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/010f6654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/010f6654

Branch: refs/heads/master
Commit: 010f6654709d2212f3d81c7ad73c73f7ebd47ea8
Parents: 5be27a2
Author: Dawid Wysakowicz <dwysakow...@apache.org>
Authored: Thu Jul 12 16:17:48 2018 +0200
Committer: Dawid Wysakowicz <dwysakow...@apache.org>
Committed: Mon Jul 16 14:01:22 2018 +0200

----------------------------------------------------------------------
 .../IteratorWrappingTestSingleInputGate.java    |  21 ++--
 .../partition/consumer/SingleInputGateTest.java |  14 +--
 .../partition/consumer/TestInputChannel.java    | 125 ++++++++++++-------
 .../partition/consumer/TestSingleInputGate.java |   2 +-
 .../partition/consumer/UnionInputGateTest.java  |  16 +--
 .../consumer/StreamTestSingleInputGate.java     |  73 +++++------
 6 files changed, 142 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 105e35f..a914733 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -25,19 +25,16 @@ import 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import 
org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.io.IOException;
 import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.mockito.Mockito.when;
 
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> 
extends TestSingleInputGate {
 
@@ -66,12 +63,12 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
 
                // The input iterator can produce an infinite stream. That's 
why we have to serialize each
                // record on demand and cannot do it upfront.
-               final Answer<Optional<BufferAndAvailability>> answer = new 
Answer<Optional<BufferAndAvailability>>() {
+               final BufferAndAvailabilityProvider answer = new 
BufferAndAvailabilityProvider() {
 
                        private boolean hasData = inputIterator.next(reuse) != 
null;
 
                        @Override
-                       public Optional<BufferAndAvailability> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                       public Optional<BufferAndAvailability> 
getBufferAvailability() throws IOException {
                                if (hasData) {
                                        serializer.clear();
                                        BufferBuilder bufferBuilder = 
createBufferBuilder(bufferSize);
@@ -83,22 +80,24 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
                                        // Call getCurrentBuffer to ensure size 
is set
                                        return Optional.of(new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
                                } else {
-                                       
when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
+                                       inputChannel.setReleased();
 
-                                       return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false, 0));
+                                       return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
+                                               false,
+                                               0));
                                }
                        }
                };
 
-               
when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
+               inputChannel.addBufferAndAvailability(answer);
 
-               inputGate.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel.getInputChannel());
+               inputGate.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel);
 
                return this;
        }
 
        public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
-               inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel());
+               inputGate.notifyChannelNonEmpty(inputChannel);
 
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c244668..7120327 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -105,10 +105,10 @@ public class SingleInputGateTest {
                };
 
                inputGate.setInputChannel(
-                       new IntermediateResultPartitionID(), 
inputChannels[0].getInputChannel());
+                       new IntermediateResultPartitionID(), inputChannels[0]);
 
                inputGate.setInputChannel(
-                       new IntermediateResultPartitionID(), 
inputChannels[1].getInputChannel());
+                       new IntermediateResultPartitionID(), inputChannels[1]);
 
                // Test
                inputChannels[0].readBuffer();
@@ -117,8 +117,8 @@ public class SingleInputGateTest {
                inputChannels[1].readEndOfPartitionEvent();
                inputChannels[0].readEndOfPartitionEvent();
 
-               
inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
-               
inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
+               inputGate.notifyChannelNonEmpty(inputChannels[0]);
+               inputGate.notifyChannelNonEmpty(inputChannels[1]);
 
                verifyBufferOrEvent(inputGate, true, 0, true);
                verifyBufferOrEvent(inputGate, true, 1, true);
@@ -141,16 +141,16 @@ public class SingleInputGateTest {
                };
 
                inputGate.setInputChannel(
-                       new IntermediateResultPartitionID(), 
inputChannels[0].getInputChannel());
+                       new IntermediateResultPartitionID(), inputChannels[0]);
 
                inputGate.setInputChannel(
-                       new IntermediateResultPartitionID(), 
inputChannels[1].getInputChannel());
+                       new IntermediateResultPartitionID(), inputChannels[1]);
 
                // Test
                inputChannels[0].readBuffer();
                inputChannels[0].readBuffer(false);
 
-               
inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
+               inputGate.notifyChannelNonEmpty(inputChannels[0]);
 
                verifyBufferOrEvent(inputGate, true, 0, true);
                verifyBufferOrEvent(inputGate, true, 0, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 3ae3a8a..80e07f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,20 +39,16 @@ import static org.mockito.Mockito.when;
 /**
  * A mocked input channel.
  */
-public class TestInputChannel {
+public class TestInputChannel extends InputChannel {
 
-       private final InputChannel mock = Mockito.mock(InputChannel.class);
+       private final Queue<BufferAndAvailabilityProvider> buffers = new 
ConcurrentLinkedQueue<>();
 
-       private final SingleInputGate inputGate;
+       private BufferAndAvailabilityProvider lastProvider = null;
 
-       // Abusing Mockito here... ;)
-       protected OngoingStubbing<Optional<BufferAndAvailability>> stubbing;
+       private boolean isReleased = false;
 
-       public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
-               checkArgument(channelIndex >= 0);
-               this.inputGate = checkNotNull(inputGate);
-
-               when(mock.getChannelIndex()).thenReturn(channelIndex);
+       TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+               super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, 
new SimpleCounter());
        }
 
        public TestInputChannel read(Buffer buffer) throws IOException, 
InterruptedException {
@@ -60,48 +56,40 @@ public class TestInputChannel {
        }
 
        public TestInputChannel read(Buffer buffer, boolean moreAvailable) 
throws IOException, InterruptedException {
-               if (stubbing == null) {
-                       stubbing = 
when(mock.getNextBuffer()).thenReturn(Optional.of(new 
BufferAndAvailability(buffer, moreAvailable, 0)));
-               } else {
-                       stubbing = stubbing.thenReturn(Optional.of(new 
BufferAndAvailability(buffer, moreAvailable, 0)));
-               }
+               addBufferAndAvailability(new BufferAndAvailability(buffer, 
moreAvailable, 0));
 
                return this;
        }
 
-       public TestInputChannel readBuffer() throws IOException, 
InterruptedException {
+       TestInputChannel readBuffer() throws IOException, InterruptedException {
                return readBuffer(true);
        }
 
-       public TestInputChannel readBuffer(boolean moreAvailable) throws 
IOException, InterruptedException {
+       TestInputChannel readBuffer(boolean moreAvailable) throws IOException, 
InterruptedException {
                final Buffer buffer = mock(Buffer.class);
                when(buffer.isBuffer()).thenReturn(true);
 
                return read(buffer, moreAvailable);
        }
 
-       public TestInputChannel readEndOfPartitionEvent() throws IOException, 
InterruptedException {
-               final Answer<Optional<BufferAndAvailability>> answer = new 
Answer<Optional<BufferAndAvailability>>() {
-                       @Override
-                       public Optional<BufferAndAvailability> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                               // Return true after finishing
-                               when(mock.isReleased()).thenReturn(true);
-
-                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false, 0));
+       TestInputChannel readEndOfPartitionEvent() throws InterruptedException {
+               addBufferAndAvailability(
+                       () -> {
+                               setReleased();
+                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
+                                       false,
+                                       0));
                        }
-               };
-
-               if (stubbing == null) {
-                       stubbing = 
when(mock.getNextBuffer()).thenAnswer(answer);
-               } else {
-                       stubbing = stubbing.thenAnswer(answer);
-               }
-
+               );
                return this;
        }
 
-       public InputChannel getInputChannel() {
-               return mock;
+       void addBufferAndAvailability(BufferAndAvailability 
bufferAndAvailability) {
+               buffers.add(() -> Optional.of(bufferAndAvailability));
+       }
+
+       void addBufferAndAvailability(BufferAndAvailabilityProvider 
bufferAndAvailability) {
+               buffers.add(bufferAndAvailability);
        }
 
        // 
------------------------------------------------------------------------
@@ -111,7 +99,7 @@ public class TestInputChannel {
         *
         * @return The created test input channels.
         */
-       public static TestInputChannel[] createInputChannels(SingleInputGate 
inputGate, int numberOfInputChannels) {
+       static TestInputChannel[] createInputChannels(SingleInputGate 
inputGate, int numberOfInputChannels) {
                checkNotNull(inputGate);
                checkArgument(numberOfInputChannels > 0);
 
@@ -120,9 +108,62 @@ public class TestInputChannel {
                for (int i = 0; i < numberOfInputChannels; i++) {
                        mocks[i] = new TestInputChannel(inputGate, i);
 
-                       inputGate.setInputChannel(new 
IntermediateResultPartitionID(), mocks[i].getInputChannel());
+                       inputGate.setInputChannel(new 
IntermediateResultPartitionID(), mocks[i]);
                }
 
                return mocks;
        }
+
+       @Override
+       void requestSubpartition(int subpartitionIndex) throws IOException, 
InterruptedException {
+
+       }
+
+       @Override
+       Optional<BufferAndAvailability> getNextBuffer() throws IOException, 
InterruptedException {
+               BufferAndAvailabilityProvider provider = buffers.poll();
+
+               if (provider != null) {
+                       lastProvider = provider;
+                       return provider.getBufferAvailability();
+               } else if (lastProvider != null) {
+                       return lastProvider.getBufferAvailability();
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       @Override
+       void sendTaskEvent(TaskEvent event) throws IOException {
+
+       }
+
+       @Override
+       boolean isReleased() {
+               return isReleased;
+       }
+
+       void setReleased() {
+               this.isReleased = true;
+       }
+
+       @Override
+       void notifySubpartitionConsumed() throws IOException {
+
+       }
+
+       @Override
+       void releaseAllResources() throws IOException {
+
+       }
+
+       @Override
+       protected void notifyChannelNonEmpty() {
+
+       }
+
+       interface BufferAndAvailabilityProvider {
+               Optional<BufferAndAvailability> getBufferAvailability() throws 
IOException, InterruptedException;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 33dc1ca..b0bafd5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -98,7 +98,7 @@ public class TestSingleInputGate {
                if (initialize) {
                        for (int i = 0; i < numberOfInputChannels; i++) {
                                inputChannels[i] = new 
TestInputChannel(inputGate, i);
-                               inputGate.setInputChannel(new 
IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+                               inputGate.setInputChannel(new 
IntermediateResultPartitionID(), inputChannels[i]);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 081d97d..2e01225 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -86,15 +86,15 @@ public class UnionInputGateTest {
                inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
                inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
 
-               
ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel());
-               
ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel());
-               
ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel());
+               ig1.notifyChannelNonEmpty(inputChannels[0][0]);
+               ig1.notifyChannelNonEmpty(inputChannels[0][1]);
+               ig1.notifyChannelNonEmpty(inputChannels[0][2]);
 
-               
ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel());
-               
ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel());
-               
ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel());
-               
ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel());
-               
ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel());
+               ig2.notifyChannelNonEmpty(inputChannels[1][0]);
+               ig2.notifyChannelNonEmpty(inputChannels[1][1]);
+               ig2.notifyChannelNonEmpty(inputChannels[1][2]);
+               ig2.notifyChannelNonEmpty(inputChannels[1][3]);
+               ig2.notifyChannelNonEmpty(inputChannels[1][4]);
 
                verifyBufferOrEvent(union, true, 0, true); // gate 1, channel 0
                verifyBufferOrEvent(union, true, 3, true); // gate 2, channel 0

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 6ab8074..ea38382 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -28,14 +28,12 @@ import 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import 
org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,7 +41,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
 
 /**
  * Test {@link InputGate} that allows setting multiple channels. Use
@@ -94,44 +91,40 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        inputQueues[channelIndex] = new 
ConcurrentLinkedQueue<InputValue<Object>>();
                        inputChannels[channelIndex] = new 
TestInputChannel(inputGate, i);
 
-                       final Answer<Optional<BufferAndAvailability>> answer = 
new Answer<Optional<BufferAndAvailability>>() {
-                               @Override
-                               public Optional<BufferAndAvailability> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                       
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = 
inputQueues[channelIndex];
-                                       InputValue<Object> input;
-                                       boolean moreAvailable;
-                                       synchronized (inputQueue) {
-                                               input = inputQueue.poll();
-                                               moreAvailable = 
!inputQueue.isEmpty();
-                                       }
-                                       if (input != null && 
input.isStreamEnd()) {
-                                               
when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
-                                                       true);
-                                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
moreAvailable, 0));
-                                       } else if (input != null && 
input.isStreamRecord()) {
-                                               Object inputElement = 
input.getStreamRecord();
-
-                                               BufferBuilder bufferBuilder = 
createBufferBuilder(bufferSize);
-                                               
recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
-                                               
delegate.setInstance(inputElement);
-                                               
recordSerializer.addRecord(delegate);
-                                               bufferBuilder.finish();
-
-                                               // Call getCurrentBuffer to 
ensure size is set
-                                               return Optional.of(new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
-                                       } else if (input != null && 
input.isEvent()) {
-                                               AbstractEvent event = 
input.getEvent();
-                                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
-                                       } else {
-                                               return Optional.empty();
-                                       }
+                       final BufferAndAvailabilityProvider answer = () -> {
+                               ConcurrentLinkedQueue<InputValue<Object>> 
inputQueue = inputQueues[channelIndex];
+                               InputValue<Object> input;
+                               boolean moreAvailable;
+                               synchronized (inputQueue) {
+                                       input = inputQueue.poll();
+                                       moreAvailable = !inputQueue.isEmpty();
+                               }
+                               if (input != null && input.isStreamEnd()) {
+                                       
inputChannels[channelIndex].setReleased();
+                                       return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
moreAvailable, 0));
+                               } else if (input != null && 
input.isStreamRecord()) {
+                                       Object inputElement = 
input.getStreamRecord();
+
+                                       BufferBuilder bufferBuilder = 
createBufferBuilder(bufferSize);
+                                       
recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
+                                       delegate.setInstance(inputElement);
+                                       recordSerializer.addRecord(delegate);
+                                       bufferBuilder.finish();
+
+                                       // Call getCurrentBuffer to ensure size 
is set
+                                       return Optional.of(new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
+                               } else if (input != null && input.isEvent()) {
+                                       AbstractEvent event = input.getEvent();
+                                       return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
+                               } else {
+                                       return Optional.empty();
                                }
                        };
 
-                       
when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
+                       
inputChannels[channelIndex].addBufferAndAvailability(answer);
 
                        inputGate.setInputChannel(new 
IntermediateResultPartitionID(),
-                               inputChannels[channelIndex].getInputChannel());
+                               inputChannels[channelIndex]);
                }
        }
 
@@ -140,7 +133,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        inputQueues[channel].add(InputValue.element(element));
                        inputQueues[channel].notifyAll();
                }
-               
inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
+               inputGate.notifyChannelNonEmpty(inputChannels[channel]);
        }
 
        public void sendEvent(AbstractEvent event, int channel) {
@@ -148,7 +141,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        inputQueues[channel].add(InputValue.event(event));
                        inputQueues[channel].notifyAll();
                }
-               
inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
+               inputGate.notifyChannelNonEmpty(inputChannels[channel]);
        }
 
        public void endInput() {
@@ -157,7 +150,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                inputQueues[i].add(InputValue.streamEnd());
                                inputQueues[i].notifyAll();
                        }
-                       
inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel());
+                       inputGate.notifyChannelNonEmpty(inputChannels[i]);
                }
        }
 

Reply via email to