[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-14 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217663737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) 
throws IOException, In
}
 
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent());
 
 Review comment:
   The calling from `getBufferBuilder` does not need this `checkState`. But for 
calling from `copyFromSerializerToTargetChannel`, it may be necessary to add 
this check for avoiding bugs.
   
   I removed it just for reducing some overheads. I will restore this check. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-14 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217661048
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   serializer.serializeRecord(record);
-
-   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
-   serializer.prune();
-   }
+   emit(record, new int[] { rng.nextInt(numChannels) });
 
 Review comment:
   got it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-14 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217619644
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   Thanks for this further suggestion!
   
   I agree with the idea of making the logic simple in the loop part and reduce 
the overhead related with the `BufferBuilder` array.
   
   I adjust the process a bit different with above codes. I think 
`bufferBuilders[targetChannel] = Optional.ofNullable(bufferBuilder)` do not 
need to be called every time during copy, because it only makes sense when it 
enters into the `while` process. Considering for common cases of small records, 
one `BuilderBuilder` can hold many serialization results, so I still retain set 
`BufferBuilder` array after requesting new buffer, and it only has a little 
overhead if one serialization record spans multiple `BufferBuilder`s. What do 
you think?


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605192
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
+
+   // 

[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605083
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   Yes, that makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-06 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215832704
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   I also have not seen throughput improvements for this change, then I will 
revert this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-28 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213349150
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   Maybe I do not get your point correctly.
   I just want to verify the two different interface methods in the same 
`RecordWriter` instance, that is `RecordWriter#emit()` and 
`RecordWriter#broadcastEmit()` in two separate cases, because these two methods 
are both involved with this serialization improvement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-28 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213345034
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int 
segmentSize) throws Exception {
 
// 
-
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-
+   BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
int numBytes = 0;
for (SerializationTestType record : records) {
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
+   serializer.serializeRecord(record);
+   RecordSerializer.SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
numBytes += record.length() + serializationOverhead;
 
if (numBytes < segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
} else if (numBytes == segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
numBytes = 0;
} else {

Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
while (result.isFullBuffer()) {
numBytes -= segmentSize;
-   result = 
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
+   result = 
serializer.copyToBufferBuilder(bufferBuilder);
 
 Review comment:
   make sense


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-28 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213352672
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   yes, i just copy the code from `BroadcastPartitioner`, and i will simple 
this code in a hotfix commit later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-28 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213346998
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   yes, i will try


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-28 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213343581
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   You pointed out a good question!
   1. Considering `tryFinishCurrentBufferBuilder()`, the logic is somewhat 
different from before. In the past, the buffer builder may be empty when 
calling `tryFinishCurrentBufferBuilder()`, then it returns a boolean value to 
indicate the result. But now, we know the buffer builder is always present when 
calling `tryFinishCurrentBufferBuilder`, so we may change it to 
`finishCurrentBufferBuilder()` seems more appropriate. And adds the check code 
instead as following:
 ```
   private void finishCurrentBufferBuilder(int targetChannel) {
checkState(bufferBuilders[targetChannel].isPresent());
   
BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();

[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-28 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213344540
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   good idea


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-15 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r210195493
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   I agree with not exposing `RecordSerializer`'s internal private fields to 
`SerializedRecord` for safe concern.
   
   Regarding with the way of reference of the `RecordSerializer` directly in 
`SerializedRecord`. The whole process in `RecordWriter` may be like this:
   
SerializedRecord  serializedRecord = 
serializer.serializeRecord(record);
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
   //reset position for multiple copying
   serializer.reset();
BufferBuilder bufferBuilder = 
getBufferBuilder(targetChannel);
CopyingResult result = 
serializedRecord.copyToBufferBuilder(bufferBuilder);
}
serializer.prune();
   
   The `SerializedRecord` only provides the method of `copyToBufferBuilder` and 
wraps the internal class `CopyingResult`, but the actual copying operation is 
still done in `RecordSerializer`. So the `RecordSerializer` also needs provide 
the method `copyToBufferBuilder`. From this point, we actually do not separate 
the two steps of `serialization` and `copy` from `RecordSerializer`. So I think 
maybe it is not very worth doing that. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-13 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209551509
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   The previous `RecordSerializer` also confuses me a lot and I have the same 
experience with you, because the previous `addRecord` and 
`continueWritingWithNextBufferBuilder`  methods can be called in arbitrary 
sequence and both returned `SerializationResult`.
   
   In my current reconstruction, the method `serializeRecord` must be called 
firstly, and then the method `copyToBufferBuilder` is called to return the 
final `SerializationResult`. I think it seems a bit clearer than before.
   
   I agree your above idea is good for separating these two methods further. 
But the `RecordSerializer` and `SerializedRecord` may be still close with each 
other. I think there are two ways to realize 
`SerializedRecord#copyToBufferBuilder`:
   1. 
   ```
  public SerializedRecord(ByteBuffer lengthBuffer, ByteBuffer dataBuffer) {
}
   
   CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) {
   // copy  lengthBuffer
  // copy dataBuffer
 //  get CopingResult
}
   ```
   
   So this way the `SerializedRecord` can only see `lengthBuffer` and 
`dataBuffer`, and can not interact with `RecordSerializer`. Maybe we do not 
need do anything in `SerializedRecord#close()`.
   
   2. 
 ```
public SerializedRecord(RecordSerializer serializer) {
  }
   
   CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) {
   serializer.copyToBufferBuilder();
   //  get CopingResult
}
   ```
   
   This way the `SerializedRecord` can see and interact with 
`RecordSerializer`, but the only difference seems we separate the 
`SerializedRecord` and `CopyingResult`. And my current implementation is we 
hide the `SerializedRecord` and return `SerializationResult` which corresponds 
to `CopyingResult` as final result.
   
   What do you think of the above ways?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services