[
https://issues.apache.org/jira/browse/FLINK-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14070230#comment-14070230
]
ASF GitHub Bot commented on FLINK-987:
--------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/77#discussion_r15226179
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/AbstractPagedOutputView.java
---
@@ -20,397 +20,233 @@
package org.apache.flink.runtime.memorymanager;
import java.io.IOException;
-import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
/**
- * The base class for all output views that are backed by multiple memory
pages. This base class contains all
- * encoding methods to write data to a page and detect page boundary
crossing. The concrete sub classes must
- * implement the methods to collect the current page and provide the next
memory page once the boundary is crossed.
- * <p>
- * The paging assumes that all memory segments are of the same size.
+ * The base class for all output views that are backed by multiple memory
pages which are not always kept in memory
+ * but which are supposed to be spilled after being written to. The
concrete sub classes must
+ * implement the methods to collect the current page and provide the next
memory page once the boundary is crossed or
+ * once the underlying buffers are unlocked by a call to {@link #unlock()}.
*/
-public abstract class AbstractPagedOutputView implements DataOutputView {
-
- private MemorySegment currentSegment; // the current
memory segment to write to
-
- protected final int segmentSize; // the
size of the memory segments
-
- protected final int headerLength; // the
number of bytes to skip at the beginning of each segment
-
- private int positionInSegment; // the
offset in the current segment
-
- private byte[] utfBuffer;
// the reusable array for UTF encodings
-
-
+public abstract class AbstractPagedOutputView extends
AbstractMemorySegmentOutputView {
+
+ // Can we just return the current segment when we get the next one or
do we have to keep
+ // them because {@link lock} was called. We need a counter because of
nested locking.
+ protected int lockCount;
+
+ // The segments that are currently locked. When {@link lock} is called
we lock the current
+ // segment and all the segments that we "advance" to. Only when {@link
unlock} is called can we return
+ // all of the segments but the most recent one.
+ private List<MemorySegment> lockedSegments;
+
+ // Index of current segment in list of locked segments. When we are not
locked this is set to 0, because
+ // then we always only have one segment.
+ private int currentSegmentIndex;
+
//
--------------------------------------------------------------------------------------------
// Constructors
//
--------------------------------------------------------------------------------------------
-
+
/**
* Creates a new output view that writes initially to the given initial
segment. All segments in the
* view have to be of the given {@code segmentSize}. A header of length
{@code headerLength} is left
* at the beginning of each segment.
- *
+ *
* @param initialSegment The segment that the view starts writing to.
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of
each segment for the header.
*/
protected AbstractPagedOutputView(MemorySegment initialSegment, int
segmentSize, int headerLength) {
+ super(initialSegment, segmentSize, headerLength);
if (initialSegment == null) {
throw new NullPointerException("Initial Segment may not
be null");
}
- this.segmentSize = segmentSize;
- this.headerLength = headerLength;
this.currentSegment = initialSegment;
this.positionInSegment = headerLength;
+
+ lockedSegments = new ArrayList<MemorySegment>();
+
+ // at the beginning we only have one segment
+ currentSegmentIndex = 0;
+ lockCount = 0;
}
-
+
/**
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of
each segment for the header.
*/
- protected AbstractPagedOutputView(int segmentSize, int headerLength)
- {
- this.segmentSize = segmentSize;
- this.headerLength = headerLength;
+ protected AbstractPagedOutputView(int segmentSize, int headerLength) {
+ super(segmentSize, headerLength);
+
+ lockedSegments = new ArrayList<MemorySegment>();
+ currentSegmentIndex = 0;
+ lockCount = 0;
}
-
+
//
--------------------------------------------------------------------------------------------
// Page Management
//
--------------------------------------------------------------------------------------------
-
+
/**
- *
+ *
* This method must return a segment. If no more segments are
available, it must throw an
--- End diff --
I think you formulated the comments like this for subclasses who implement
the methods, but it might be confusing for someone who just reads the
(inherited) comment later.
> Extend TypeSerializers and -Comparators to work directly on Memory Segments
> ---------------------------------------------------------------------------
>
> Key: FLINK-987
> URL: https://issues.apache.org/jira/browse/FLINK-987
> Project: Flink
> Issue Type: Improvement
> Components: Local Runtime
> Affects Versions: 0.6-incubating
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
> Fix For: 0.6-incubating
>
>
> As per discussion with [~till.rohrmann], [~uce], [~aljoscha], we suggest to
> change the way that the TypeSerialzers/Comparators and
> DataInputViews/DataOutputViews work.
> The goal is to allow more flexibility in the construction on the binary
> representation of data types, and to allow partial deserialization of
> individual fields. Both is currently prohibited by the fact that the
> abstraction of the memory (into which the data goes) is a stream abstraction
> ({{DataInputView}}, {{DataOutputView}}).
> An idea is to offer a random-access buffer like view for construction and
> random-access deserialization, as well as various methods to copy elements in
> a binary fashion between such buffers and streams.
> A possible set of methods for the {{TypeSerializer}} could be:
> {code}
> long serialize(T record, TargetBuffer buffer);
>
> T deserialize(T reuse, SourceBuffer source);
>
> void ensureBufferSufficientlyFilled(SourceBuffer source);
>
> <X> X deserializeField(X reuse, int logicalPos, SourceBuffer buffer);
>
> int getOffsetForField(int logicalPos, int offset, SourceBuffer buffer);
>
> void copy(DataInputView in, TargetBuffer buffer);
>
> void copy(SourceBuffer buffer,, DataOutputView out);
>
> void copy(DataInputView source, DataOutputView target);
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)