[ https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287907#comment-16287907 ]
ASF GitHub Bot commented on FLINK-8178: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5105#discussion_r156407089 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java --- @@ -144,51 +136,12 @@ public void addInputGate(InputGate gate) { inputs.add(gate); } - public <T> void addOutput(final Queue<Object> outputList, final TypeSerializer<T> serializer) { + public <T> void addOutput(final Collection<Object> outputList, final TypeSerializer<T> serializer) { try { - // The record-oriented writers wrap the buffer writer. We mock it - // to collect the returned buffers and deserialize the content to - // the output list - BufferProvider mockBufferProvider = mock(BufferProvider.class); - when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() { - - @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - return new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(bufferSize), - mock(BufferRecycler.class)); - } - }); - - ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class); - when(mockWriter.getNumberOfOutputChannels()).thenReturn(1); - when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider); - - final RecordDeserializer<DeserializationDelegate<T>> recordDeserializer = new AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>(); - final NonReusingDeserializationDelegate<T> delegate = new NonReusingDeserializationDelegate<T>(serializer); - - // Add records and events from the buffer to the output list - doAnswer(new Answer<Void>() { - - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Buffer buffer = (Buffer) invocationOnMock.getArguments()[0]; - addBufferToOutputList(recordDeserializer, delegate, buffer, outputList); - return null; - } - }).when(mockWriter).writeBuffer(any(Buffer.class), anyInt()); - - doAnswer(new Answer<Void>() { - - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Buffer buffer = (Buffer) invocationOnMock.getArguments()[0]; - addBufferToOutputList(recordDeserializer, delegate, buffer, outputList); --- End diff -- since you included the contents of `addBufferToOutputList()` into `RecordOrEventCollectingResultPartitionWriter<T>`, you can remove this unused method > Introduce not threadsafe write only BufferBuilder > ------------------------------------------------- > > Key: FLINK-8178 > URL: https://issues.apache.org/jira/browse/FLINK-8178 > Project: Flink > Issue Type: Improvement > Components: Network > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Fix For: 1.5.0 > > > While Buffer class is used in multithreaded context it requires > synchronisation. Now it is miss-leading/unclear and suggesting that > RecordSerializer should take into account synchronisation of the Buffer > that's holding. With NotThreadSafe BufferBuilder there would be clear > separation between single-threaded writing/creating a BufferBuilder and > multithreaded Buffer handling/retaining/recycling. -- This message was sent by Atlassian JIRA (v6.4.14#64029)