[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217663737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, In } private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent()); Review comment: The calling from `getBufferBuilder` does not need this `checkState`. But for calling from `copyFromSerializerToTargetChannel`, it may be necessary to add this check for avoiding bugs. I removed it just for reducing some overheads. I will restore this check. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217661048 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, InterruptedException { * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - serializer.serializeRecord(record); - - if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { - serializer.prune(); - } + emit(record, new int[] { rng.nextInt(numChannels) }); Review comment: got it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217619644 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: Thanks for this further suggestion! I agree with the idea of making the logic simple in the loop part and reduce the overhead related with the `BufferBuilder` array. I adjust the process a bit different with above codes. I think `bufferBuilders[targetChannel] = Optional.ofNullable(bufferBuilder)` do not need to be called every time during copy, because it only makes sense when it enters into the `while` process. Considering for common cases of small records, one `BuilderBuilder` can hold many serialization results, so I still retain set `BufferBuilder` array after requesting new buffer, and it only has a little overhead if one serialization record spans multiple `BufferBuilder`s. What do you think?
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605192 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); + + //
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605083 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: Yes, that makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215832704 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: I also have not seen throughput improvements for this change, then I will revert this change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213349150 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: Maybe I do not get your point correctly. I just want to verify the two different interface methods in the same `RecordWriter` instance, that is `RecordWriter#emit()` and `RecordWriter#broadcastEmit()` in two separate cases, because these two methods are both involved with this serialization improvement. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213345034 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int segmentSize) throws Exception { // - - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); - + BufferBuilder bufferBuilder = createBufferBuilder(segmentSize); int numBytes = 0; for (SerializationTestType record : records) { - RecordSerializer.SerializationResult result = serializer.addRecord(record); + serializer.serializeRecord(record); + RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); numBytes += record.length() + serializationOverhead; if (numBytes < segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } else if (numBytes == segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + bufferBuilder = createBufferBuilder(segmentSize); numBytes = 0; } else { Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); while (result.isFullBuffer()) { numBytes -= segmentSize; - result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + bufferBuilder = createBufferBuilder(segmentSize); + result = serializer.copyToBufferBuilder(bufferBuilder); Review comment: make sense This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213352672 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: yes, i just copy the code from `BroadcastPartitioner`, and i will simple this code in a hotfix commit later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213346998 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: yes, i will try This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213343581 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: You pointed out a good question! 1. Considering `tryFinishCurrentBufferBuilder()`, the logic is somewhat different from before. In the past, the buffer builder may be empty when calling `tryFinishCurrentBufferBuilder()`, then it returns a boolean value to indicate the result. But now, we know the buffer builder is always present when calling `tryFinishCurrentBufferBuilder`, so we may change it to `finishCurrentBufferBuilder()` seems more appropriate. And adds the check code instead as following: ``` private void finishCurrentBufferBuilder(int targetChannel) { checkState(bufferBuilders[targetChannel].isPresent()); BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get();
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213344540 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -40,25 +41,25 @@ @Test public void testHasSerializedData() throws IOException { - final int segmentSize = 16; - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasSerializedData()); - serializer.addRecord(randomIntRecord); + serializer.serializeRecord(randomIntRecord); Assert.assertTrue(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + final BufferBuilder bufferBuilder1 = createBufferBuilder(16); + serializer.copyToBufferBuilder(bufferBuilder1); Assert.assertFalse(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8)); - - serializer.addRecord(randomIntRecord); + final BufferBuilder bufferBuilder2 = createBufferBuilder(8); + serializer.serializeRecord(randomIntRecord); Review comment: good idea This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r210195493 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: I agree with not exposing `RecordSerializer`'s internal private fields to `SerializedRecord` for safe concern. Regarding with the way of reference of the `RecordSerializer` directly in `SerializedRecord`. The whole process in `RecordWriter` may be like this: SerializedRecord serializedRecord = serializer.serializeRecord(record); for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { //reset position for multiple copying serializer.reset(); BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); CopyingResult result = serializedRecord.copyToBufferBuilder(bufferBuilder); } serializer.prune(); The `SerializedRecord` only provides the method of `copyToBufferBuilder` and wraps the internal class `CopyingResult`, but the actual copying operation is still done in `RecordSerializer`. So the `RecordSerializer` also needs provide the method `copyToBufferBuilder`. From this point, we actually do not separate the two steps of `serialization` and `copy` from `RecordSerializer`. So I think maybe it is not very worth doing that. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209551509 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: The previous `RecordSerializer` also confuses me a lot and I have the same experience with you, because the previous `addRecord` and `continueWritingWithNextBufferBuilder` methods can be called in arbitrary sequence and both returned `SerializationResult`. In my current reconstruction, the method `serializeRecord` must be called firstly, and then the method `copyToBufferBuilder` is called to return the final `SerializationResult`. I think it seems a bit clearer than before. I agree your above idea is good for separating these two methods further. But the `RecordSerializer` and `SerializedRecord` may be still close with each other. I think there are two ways to realize `SerializedRecord#copyToBufferBuilder`: 1. ``` public SerializedRecord(ByteBuffer lengthBuffer, ByteBuffer dataBuffer) { } CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) { // copy lengthBuffer // copy dataBuffer // get CopingResult } ``` So this way the `SerializedRecord` can only see `lengthBuffer` and `dataBuffer`, and can not interact with `RecordSerializer`. Maybe we do not need do anything in `SerializedRecord#close()`. 2. ``` public SerializedRecord(RecordSerializer serializer) { } CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) { serializer.copyToBufferBuilder(); // get CopingResult } ``` This way the `SerializedRecord` can see and interact with `RecordSerializer`, but the only difference seems we separate the `SerializedRecord` and `CopyingResult`. And my current implementation is we hide the `SerializedRecord` and return `SerializationResult` which corresponds to `CopyingResult` as final result. What do you think of the above ways? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services