[
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287907#comment-16287907
]
ASF GitHub Bot commented on FLINK-8178:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/5105#discussion_r156407089
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
---
@@ -144,51 +136,12 @@ public void addInputGate(InputGate gate) {
inputs.add(gate);
}
- public <T> void addOutput(final Queue<Object> outputList, final
TypeSerializer<T> serializer) {
+ public <T> void addOutput(final Collection<Object> outputList, final
TypeSerializer<T> serializer) {
try {
- // The record-oriented writers wrap the buffer writer.
We mock it
- // to collect the returned buffers and deserialize the
content to
- // the output list
- BufferProvider mockBufferProvider =
mock(BufferProvider.class);
-
when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new
Answer<Buffer>() {
-
- @Override
- public Buffer answer(InvocationOnMock
invocationOnMock) throws Throwable {
- return new Buffer(
-
MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
- }
- });
-
- ResultPartitionWriter mockWriter =
mock(ResultPartitionWriter.class);
-
when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
-
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
-
- final RecordDeserializer<DeserializationDelegate<T>>
recordDeserializer = new
AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
- final NonReusingDeserializationDelegate<T> delegate =
new NonReusingDeserializationDelegate<T>(serializer);
-
- // Add records and events from the buffer to the output
list
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock
invocationOnMock) throws Throwable {
- Buffer buffer = (Buffer)
invocationOnMock.getArguments()[0];
-
addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
- return null;
- }
- }).when(mockWriter).writeBuffer(any(Buffer.class),
anyInt());
-
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock
invocationOnMock) throws Throwable {
- Buffer buffer = (Buffer)
invocationOnMock.getArguments()[0];
-
addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
--- End diff --
since you included the contents of `addBufferToOutputList()` into
`RecordOrEventCollectingResultPartitionWriter<T>`, you can remove this unused
method
> Introduce not threadsafe write only BufferBuilder
> -------------------------------------------------
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires
> synchronisation. Now it is miss-leading/unclear and suggesting that
> RecordSerializer should take into account synchronisation of the Buffer
> that's holding. With NotThreadSafe BufferBuilder there would be clear
> separation between single-threaded writing/creating a BufferBuilder and
> multithreaded Buffer handling/retaining/recycling.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)