[
https://issues.apache.org/jira/browse/FLINK-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14070236#comment-14070236
]
ASF GitHub Bot commented on FLINK-987:
--------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/77#discussion_r15226580
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/AbstractPagedOutputView.java
---
@@ -20,397 +20,233 @@
package org.apache.flink.runtime.memorymanager;
import java.io.IOException;
-import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
/**
- * The base class for all output views that are backed by multiple memory
pages. This base class contains all
- * encoding methods to write data to a page and detect page boundary
crossing. The concrete sub classes must
- * implement the methods to collect the current page and provide the next
memory page once the boundary is crossed.
- * <p>
- * The paging assumes that all memory segments are of the same size.
+ * The base class for all output views that are backed by multiple memory
pages which are not always kept in memory
+ * but which are supposed to be spilled after being written to. The
concrete sub classes must
+ * implement the methods to collect the current page and provide the next
memory page once the boundary is crossed or
+ * once the underlying buffers are unlocked by a call to {@link #unlock()}.
*/
-public abstract class AbstractPagedOutputView implements DataOutputView {
-
- private MemorySegment currentSegment; // the current
memory segment to write to
-
- protected final int segmentSize; // the
size of the memory segments
-
- protected final int headerLength; // the
number of bytes to skip at the beginning of each segment
-
- private int positionInSegment; // the
offset in the current segment
-
- private byte[] utfBuffer;
// the reusable array for UTF encodings
-
-
+public abstract class AbstractPagedOutputView extends
AbstractMemorySegmentOutputView {
+
+ // Can we just return the current segment when we get the next one or
do we have to keep
+ // them because {@link lock} was called. We need a counter because of
nested locking.
+ protected int lockCount;
+
+ // The segments that are currently locked. When {@link lock} is called
we lock the current
+ // segment and all the segments that we "advance" to. Only when {@link
unlock} is called can we return
+ // all of the segments but the most recent one.
+ private List<MemorySegment> lockedSegments;
+
+ // Index of current segment in list of locked segments. When we are not
locked this is set to 0, because
+ // then we always only have one segment.
+ private int currentSegmentIndex;
+
//
--------------------------------------------------------------------------------------------
// Constructors
//
--------------------------------------------------------------------------------------------
-
+
/**
* Creates a new output view that writes initially to the given initial
segment. All segments in the
* view have to be of the given {@code segmentSize}. A header of length
{@code headerLength} is left
* at the beginning of each segment.
- *
+ *
* @param initialSegment The segment that the view starts writing to.
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of
each segment for the header.
*/
protected AbstractPagedOutputView(MemorySegment initialSegment, int
segmentSize, int headerLength) {
+ super(initialSegment, segmentSize, headerLength);
if (initialSegment == null) {
throw new NullPointerException("Initial Segment may not
be null");
}
- this.segmentSize = segmentSize;
- this.headerLength = headerLength;
this.currentSegment = initialSegment;
this.positionInSegment = headerLength;
+
+ lockedSegments = new ArrayList<MemorySegment>();
+
+ // at the beginning we only have one segment
+ currentSegmentIndex = 0;
+ lockCount = 0;
}
-
+
/**
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of
each segment for the header.
*/
- protected AbstractPagedOutputView(int segmentSize, int headerLength)
- {
- this.segmentSize = segmentSize;
- this.headerLength = headerLength;
+ protected AbstractPagedOutputView(int segmentSize, int headerLength) {
+ super(segmentSize, headerLength);
+
+ lockedSegments = new ArrayList<MemorySegment>();
+ currentSegmentIndex = 0;
+ lockCount = 0;
}
-
+
//
--------------------------------------------------------------------------------------------
// Page Management
//
--------------------------------------------------------------------------------------------
-
+
/**
- *
+ *
* This method must return a segment. If no more segments are
available, it must throw an
* {@link java.io.EOFException}.
- *
- * @param current The current memory segment
- * @param positionInCurrent The position in the segment, one after the
last valid byte.
- * @return The next memory segment.
- *
- * @throws IOException
+ *
+ * @return The next memory segment.
+ *
+ * @throws java.io.IOException
+ */
+
+ protected abstract MemorySegment requestSegment() throws IOException;
+
+ /**
+ *
+ * This method returns a memory segment that has been filled to where
it came from. Child classes
+ * are responsible for handling the segments.
+ *
+ * @param segment The memory segment
+ * @param bytesWritten The position in the segment, one after the last
valid byte.
+ *
+ * @throws java.io.IOException
*/
- protected abstract MemorySegment nextSegment(MemorySegment current, int
positionInCurrent) throws IOException;
-
-
+ protected abstract void returnSegment(MemorySegment segment, int
bytesWritten) throws IOException;
+
+
/**
* Gets the segment that the view currently writes to.
- *
+ *
* @return The segment the view currently writes to.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}
-
+
/**
* Gets the current write position (the position where the next bytes
will be written)
* in the current memory segment.
- *
+ *
* @return The current write offset in the current memory segment.
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}
-
+
/**
* Gets the size of the segments used by this view.
- *
+ *
* @return The memory segment size.
*/
public int getSegmentSize() {
return this.segmentSize;
}
-
+
/**
* Moves the output view to the next page. This method invokes
internally the
- * {@link #nextSegment(MemorySegment, int)} method to give the current
memory segment to the concrete subclass'
- * implementation and obtain the next segment to write to. Writing will
continue inside the new segment
- * after the header.
- *
- * @throws IOException Thrown, if the current segment could not be
processed or a new segment could not
- * be obtained.
+ * {@link #requestSegment()} and {@link
#returnSegment(org.apache.flink.core.memory.MemorySegment, int)} methods
+ * to give the current memory segment to the concrete subclass'
implementation and obtain the next segment to
+ * write to. Writing will continue inside the new segment after the
header.
+ *
+ * @throws java.io.IOException Thrown, if the current segment could not
be processed or a new segment could not
+ * be obtained.
*/
- protected void advance() throws IOException {
- this.currentSegment = nextSegment(this.currentSegment,
this.positionInSegment);
- this.positionInSegment = this.headerLength;
- }
-
- /**
- * Sets the internal state to the given memory segment and the given
position within the segment.
- *
- * @param seg The memory segment to write the next bytes to.
- * @param position The position to start writing the next bytes to.
- */
- protected void seekOutput(MemorySegment seg, int position) {
- this.currentSegment = seg;
- this.positionInSegment = position;
- }
-
- /**
- * Clears the internal state. Any successive write calls will fail
until either {@link #advance()} or
- * {@link #seekOutput(MemorySegment, int)} is called.
- *
- * @see #advance()
- * @see #seekOutput(MemorySegment, int)
- */
- protected void clear() {
- this.currentSegment = null;
- this.positionInSegment = this.headerLength;
- }
-
- //
--------------------------------------------------------------------------------------------
- // Data Output Specific methods
- //
--------------------------------------------------------------------------------------------
-
- @Override
- public void write(int b) throws IOException {
- writeByte(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
@Override
- public void write(byte[] b, int off, int len) throws IOException {
- int remaining = this.segmentSize - this.positionInSegment;
- if (remaining >= len) {
- this.currentSegment.put(this.positionInSegment, b, off,
len);
- this.positionInSegment += len;
- }
- else {
- if (remaining == 0) {
- advance();
- remaining = this.segmentSize -
this.positionInSegment;
- }
- while (true) {
- int toPut = Math.min(remaining, len);
- this.currentSegment.put(this.positionInSegment,
b, off, toPut);
- off += toPut;
- len -= toPut;
-
- if (len > 0) {
- this.positionInSegment =
this.segmentSize;
- advance();
- remaining = this.segmentSize -
this.positionInSegment;
- }
- else {
- this.positionInSegment += toPut;
- break;
- }
+ protected void advance() throws IOException {
+ if (lockCount > 0) {
+ if (currentSegmentIndex < lockedSegments.size() - 1) {
+ currentSegmentIndex++;
+ currentSegment =
lockedSegments.get(currentSegmentIndex);
+ } else {
+ currentSegmentIndex++;
+ currentSegment = requestSegment();
+ lockedSegments.add(currentSegment);
}
+ } else {
+ returnSegment(currentSegment, positionInSegment);
+ currentSegment = requestSegment();
+ currentSegmentIndex = 0;
}
+ positionInSegment = headerLength;
}
- @Override
- public void writeBoolean(boolean v) throws IOException {
- writeByte(v ? 1 : 0);
- }
-
- @Override
- public void writeByte(int v) throws IOException {
- if (this.positionInSegment < this.segmentSize) {
- this.currentSegment.put(this.positionInSegment++,
(byte) v);
- }
- else {
- advance();
- writeByte(v);
+ public void lock() {
+ lockCount++;
+ if (lockCount == 1) {
+ // we are the first to lock, so start the "locked
segments" list
+ Preconditions.checkState(lockedSegments.isEmpty(),
"List of locked segments must be empty.");
+ Preconditions.checkState(currentSegmentIndex == 0,
"Before locking currentSegmentIndex must always be 0.");
+ lockedSegments.add(currentSegment);
}
}
- @Override
- public void writeShort(int v) throws IOException {
- if (this.positionInSegment < this.segmentSize - 1) {
- this.currentSegment.putShort(this.positionInSegment,
(short) v);
- this.positionInSegment += 2;
- }
- else if (this.positionInSegment == this.segmentSize) {
- advance();
- writeShort(v);
- }
- else {
- writeByte(v >> 8);
- writeByte(v);
- }
- }
-
- @Override
- public void writeChar(int v) throws IOException {
- if (this.positionInSegment < this.segmentSize - 1) {
- this.currentSegment.putChar(this.positionInSegment,
(char) v);
- this.positionInSegment += 2;
- }
- else if (this.positionInSegment == this.segmentSize) {
- advance();
- writeChar(v);
- }
- else {
- writeByte(v >> 8);
- writeByte(v);
- }
- }
-
- @Override
- public void writeInt(int v) throws IOException {
- if (this.positionInSegment < this.segmentSize - 3) {
-
this.currentSegment.putIntBigEndian(this.positionInSegment, v);
- this.positionInSegment += 4;
- }
- else if (this.positionInSegment == this.segmentSize) {
- advance();
- writeInt(v);
- }
- else {
- writeByte(v >> 24);
- writeByte(v >> 16);
- writeByte(v >> 8);
- writeByte(v);
- }
- }
+ // here positions are relative to first locked segment
+ public long tell() {
+ Preconditions.checkState(lockCount > 0, "Target buffer must be
locked.");
- @Override
- public void writeLong(long v) throws IOException {
- if (this.positionInSegment < this.segmentSize - 7) {
-
this.currentSegment.putLongBigEndian(this.positionInSegment, v);
- this.positionInSegment += 8;
- }
- else if (this.positionInSegment == this.segmentSize) {
- advance();
- writeLong(v);
- }
- else {
- writeByte((int) (v >> 56));
- writeByte((int) (v >> 48));
- writeByte((int) (v >> 40));
- writeByte((int) (v >> 32));
- writeByte((int) (v >> 24));
- writeByte((int) (v >> 16));
- writeByte((int) (v >> 8));
- writeByte((int) v);
- }
- }
-
- @Override
- public void writeFloat(float v) throws IOException {
- writeInt(Float.floatToRawIntBits(v));
- }
-
- @Override
- public void writeDouble(double v) throws IOException {
- writeLong(Double.doubleToRawLongBits(v));
- }
-
- @Override
- public void writeBytes(String s) throws IOException {
- for (int i = 0; i < s.length(); i++) {
- writeByte(s.charAt(i));
- }
+ return currentSegmentIndex * segmentSize + positionInSegment;
}
- @Override
- public void writeChars(String s) throws IOException {
- for (int i = 0; i < s.length(); i++) {
- writeChar(s.charAt(i));
- }
- }
+ public void seek(long position) throws IOException {
+ Preconditions.checkState(lockCount > 0, "Target buffer must be
locked.");
+ Preconditions.checkArgument(position >= 0, "position must be
positive");
--- End diff --
Changing it. Man @uce is strict... :smile:
> Extend TypeSerializers and -Comparators to work directly on Memory Segments
> ---------------------------------------------------------------------------
>
> Key: FLINK-987
> URL: https://issues.apache.org/jira/browse/FLINK-987
> Project: Flink
> Issue Type: Improvement
> Components: Local Runtime
> Affects Versions: 0.6-incubating
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
> Fix For: 0.6-incubating
>
>
> As per discussion with [~till.rohrmann], [~uce], [~aljoscha], we suggest to
> change the way that the TypeSerialzers/Comparators and
> DataInputViews/DataOutputViews work.
> The goal is to allow more flexibility in the construction on the binary
> representation of data types, and to allow partial deserialization of
> individual fields. Both is currently prohibited by the fact that the
> abstraction of the memory (into which the data goes) is a stream abstraction
> ({{DataInputView}}, {{DataOutputView}}).
> An idea is to offer a random-access buffer like view for construction and
> random-access deserialization, as well as various methods to copy elements in
> a binary fashion between such buffers and streams.
> A possible set of methods for the {{TypeSerializer}} could be:
> {code}
> long serialize(T record, TargetBuffer buffer);
>
> T deserialize(T reuse, SourceBuffer source);
>
> void ensureBufferSufficientlyFilled(SourceBuffer source);
>
> <X> X deserializeField(X reuse, int logicalPos, SourceBuffer buffer);
>
> int getOffsetForField(int logicalPos, int offset, SourceBuffer buffer);
>
> void copy(DataInputView in, TargetBuffer buffer);
>
> void copy(SourceBuffer buffer,, DataOutputView out);
>
> void copy(DataInputView source, DataOutputView target);
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)