[hotfix][network] Drop redundant this reference usages
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f60a1de Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f60a1de Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f60a1de Branch: refs/heads/master Commit: 1f60a1de563ccca4ea0309fbd5c6c3531090ddc9 Parents: 1752fdb Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Mon Dec 18 15:26:20 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Jan 8 11:46:00 2018 +0100 ---------------------------------------------------------------------- .../serialization/SpanningRecordSerializer.java | 76 ++++++++++---------- 1 file changed, 38 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1f60a1de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 87b9e4c..7394f83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -59,14 +59,14 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R private int limit; public SpanningRecordSerializer() { - this.serializationBuffer = new DataOutputSerializer(128); + serializationBuffer = new DataOutputSerializer(128); - this.lengthBuffer = ByteBuffer.allocate(4); - this.lengthBuffer.order(ByteOrder.BIG_ENDIAN); + lengthBuffer = ByteBuffer.allocate(4); + lengthBuffer.order(ByteOrder.BIG_ENDIAN); // ensure initial state with hasRemaining false (for correct setNextBuffer logic) - this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer(); - this.lengthBuffer.position(4); + dataBuffer = serializationBuffer.wrapAsByteBuffer(); + lengthBuffer.position(4); } /** @@ -81,50 +81,50 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public SerializationResult addRecord(T record) throws IOException { if (CHECKED) { - if (this.dataBuffer.hasRemaining()) { + if (dataBuffer.hasRemaining()) { throw new IllegalStateException("Pending serialization of previous record."); } } - this.serializationBuffer.clear(); - this.lengthBuffer.clear(); + serializationBuffer.clear(); + lengthBuffer.clear(); // write data and length - record.write(this.serializationBuffer); + record.write(serializationBuffer); - int len = this.serializationBuffer.length(); - this.lengthBuffer.putInt(0, len); + int len = serializationBuffer.length(); + lengthBuffer.putInt(0, len); - this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer(); + dataBuffer = serializationBuffer.wrapAsByteBuffer(); // Copy from intermediate buffers to current target memory segment - copyToTargetBufferFrom(this.lengthBuffer); - copyToTargetBufferFrom(this.dataBuffer); + copyToTargetBufferFrom(lengthBuffer); + copyToTargetBufferFrom(dataBuffer); return getSerializationResult(); } @Override public SerializationResult setNextBuffer(Buffer buffer) throws IOException { - this.targetBuffer = buffer; - this.position = 0; - this.limit = buffer.getSize(); + targetBuffer = buffer; + position = 0; + limit = buffer.getSize(); - if (this.lengthBuffer.hasRemaining()) { - copyToTargetBufferFrom(this.lengthBuffer); + if (lengthBuffer.hasRemaining()) { + copyToTargetBufferFrom(lengthBuffer); } - if (this.dataBuffer.hasRemaining()) { - copyToTargetBufferFrom(this.dataBuffer); + if (dataBuffer.hasRemaining()) { + copyToTargetBufferFrom(dataBuffer); } SerializationResult result = getSerializationResult(); // make sure we don't hold onto the large buffers for too long if (result.isFullRecord()) { - this.serializationBuffer.clear(); - this.serializationBuffer.pruneBuffer(); - this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer(); + serializationBuffer.clear(); + serializationBuffer.pruneBuffer(); + dataBuffer = serializationBuffer.wrapAsByteBuffer(); } return result; @@ -137,22 +137,22 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R * @param source the {@link ByteBuffer} to copy data from */ private void copyToTargetBufferFrom(ByteBuffer source) { - if (this.targetBuffer == null) { + if (targetBuffer == null) { return; } int needed = source.remaining(); - int available = this.limit - this.position; + int available = limit - position; int toCopy = Math.min(needed, available); - this.targetBuffer.getMemorySegment().put(this.position, source, toCopy); + targetBuffer.getMemorySegment().put(position, source, toCopy); - this.position += toCopy; + position += toCopy; } private SerializationResult getSerializationResult() { - if (!this.dataBuffer.hasRemaining() && !this.lengthBuffer.hasRemaining()) { - return (this.position < this.limit) + if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) { + return (position < limit) ? SerializationResult.FULL_RECORD : SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL; } @@ -166,8 +166,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R return null; } - this.targetBuffer.setSize(this.position); - return this.targetBuffer; + targetBuffer.setSize(position); + return targetBuffer; } @Override @@ -179,19 +179,19 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public void clear() { - this.targetBuffer = null; - this.position = 0; - this.limit = 0; + targetBuffer = null; + position = 0; + limit = 0; // ensure clear state with hasRemaining false (for correct setNextBuffer logic) - this.dataBuffer.position(this.dataBuffer.limit()); - this.lengthBuffer.position(4); + dataBuffer.position(dataBuffer.limit()); + lengthBuffer.position(4); } @Override public boolean hasData() { // either data in current target buffer or intermediate buffers - return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); + return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining()); } @Override