[ 
https://issues.apache.org/jira/browse/FLINK-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14070232#comment-14070232
 ] 

ASF GitHub Bot commented on FLINK-987:
--------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/77#discussion_r15226341
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/AbstractPagedOutputView.java
 ---
    @@ -20,397 +20,233 @@
     package org.apache.flink.runtime.memorymanager;
     
     import java.io.IOException;
    -import java.io.UTFDataFormatException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import com.google.common.base.Preconditions;
     
    -import org.apache.flink.core.memory.DataInputView;
    -import org.apache.flink.core.memory.DataOutputView;
     import org.apache.flink.core.memory.MemorySegment;
     
     
     /**
    - * The base class for all output views that are backed by multiple memory 
pages. This base class contains all
    - * encoding methods to write data to a page and detect page boundary 
crossing. The concrete sub classes must
    - * implement the methods to collect the current page and provide the next 
memory page once the boundary is crossed.
    - * <p>
    - * The paging assumes that all memory segments are of the same size.
    + * The base class for all output views that are backed by multiple memory 
pages which are not always kept in memory
    + * but which are supposed to be spilled after being written to. The 
concrete sub classes must
    + * implement the methods to collect the current page and provide the next 
memory page once the boundary is crossed or
    + * once the underlying buffers are unlocked by a call to {@link #unlock()}.
      */
    -public abstract class AbstractPagedOutputView implements DataOutputView {
    -   
    -   private MemorySegment currentSegment;                   // the current 
memory segment to write to
    -   
    -   protected final int segmentSize;                                // the 
size of the memory segments
    -   
    -   protected final int headerLength;                               // the 
number of bytes to skip at the beginning of each segment
    -   
    -   private int positionInSegment;                                  // the 
offset in the current segment
    -   
    -   private byte[] utfBuffer;                                               
// the reusable array for UTF encodings
    -   
    -   
    +public abstract class AbstractPagedOutputView extends 
AbstractMemorySegmentOutputView {
    +
    +   // Can we just return the current segment when we get the next one or 
do we have to keep
    +   // them because {@link lock} was called. We need a counter because of 
nested locking.
    +   protected int lockCount;
    +
    +   // The segments that are currently locked. When {@link lock} is called 
we lock the current
    +   // segment and all the segments that we "advance" to. Only when {@link 
unlock} is called can we return
    +   // all of the segments but the most recent one.
    +   private List<MemorySegment> lockedSegments;
    +
    +   // Index of current segment in list of locked segments. When we are not 
locked this is set to 0, because
    +   // then we always only have one segment.
    +   private int currentSegmentIndex;
    +
        // 
--------------------------------------------------------------------------------------------
        //                                    Constructors
        // 
--------------------------------------------------------------------------------------------
    -   
    +
        /**
         * Creates a new output view that writes initially to the given initial 
segment. All segments in the
         * view have to be of the given {@code segmentSize}. A header of length 
{@code headerLength} is left
         * at the beginning of each segment.
    -    * 
    +    *
         * @param initialSegment The segment that the view starts writing to.
         * @param segmentSize The size of the memory segments.
         * @param headerLength The number of bytes to skip at the beginning of 
each segment for the header.
         */
        protected AbstractPagedOutputView(MemorySegment initialSegment, int 
segmentSize, int headerLength) {
    +           super(initialSegment, segmentSize, headerLength);
    --- End diff --
    
    Oh I see. Why do we check again here then?


> Extend TypeSerializers and -Comparators to work directly on Memory Segments
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-987
>                 URL: https://issues.apache.org/jira/browse/FLINK-987
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>    Affects Versions: 0.6-incubating
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>             Fix For: 0.6-incubating
>
>
> As per discussion with [~till.rohrmann], [~uce], [~aljoscha], we suggest to 
> change the way that the TypeSerialzers/Comparators and 
> DataInputViews/DataOutputViews work.
> The goal is to allow more flexibility in the construction on the binary 
> representation of data types, and to allow partial deserialization of 
> individual fields. Both is currently prohibited by the fact that the 
> abstraction of the memory (into which the data goes) is a stream abstraction 
> ({{DataInputView}}, {{DataOutputView}}).
> An idea is to offer a random-access buffer like view for construction and 
> random-access deserialization, as well as various methods to copy elements in 
> a binary fashion between such buffers and streams.
> A possible set of methods for the {{TypeSerializer}} could be:
> {code}
> long serialize(T record, TargetBuffer buffer);
>       
> T deserialize(T reuse, SourceBuffer source);
>       
> void ensureBufferSufficientlyFilled(SourceBuffer source);
>       
> <X> X deserializeField(X reuse, int logicalPos, SourceBuffer buffer);
>       
> int getOffsetForField(int logicalPos, int offset, SourceBuffer buffer);
>       
> void copy(DataInputView in, TargetBuffer buffer);
>       
> void copy(SourceBuffer buffer,, DataOutputView out);
>       
> void copy(DataInputView source, DataOutputView target);
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to