[ 
https://issues.apache.org/jira/browse/FLINK-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14070236#comment-14070236
 ] 

ASF GitHub Bot commented on FLINK-987:
--------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/77#discussion_r15226580
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/AbstractPagedOutputView.java
 ---
    @@ -20,397 +20,233 @@
     package org.apache.flink.runtime.memorymanager;
     
     import java.io.IOException;
    -import java.io.UTFDataFormatException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import com.google.common.base.Preconditions;
     
    -import org.apache.flink.core.memory.DataInputView;
    -import org.apache.flink.core.memory.DataOutputView;
     import org.apache.flink.core.memory.MemorySegment;
     
     
     /**
    - * The base class for all output views that are backed by multiple memory 
pages. This base class contains all
    - * encoding methods to write data to a page and detect page boundary 
crossing. The concrete sub classes must
    - * implement the methods to collect the current page and provide the next 
memory page once the boundary is crossed.
    - * <p>
    - * The paging assumes that all memory segments are of the same size.
    + * The base class for all output views that are backed by multiple memory 
pages which are not always kept in memory
    + * but which are supposed to be spilled after being written to. The 
concrete sub classes must
    + * implement the methods to collect the current page and provide the next 
memory page once the boundary is crossed or
    + * once the underlying buffers are unlocked by a call to {@link #unlock()}.
      */
    -public abstract class AbstractPagedOutputView implements DataOutputView {
    -   
    -   private MemorySegment currentSegment;                   // the current 
memory segment to write to
    -   
    -   protected final int segmentSize;                                // the 
size of the memory segments
    -   
    -   protected final int headerLength;                               // the 
number of bytes to skip at the beginning of each segment
    -   
    -   private int positionInSegment;                                  // the 
offset in the current segment
    -   
    -   private byte[] utfBuffer;                                               
// the reusable array for UTF encodings
    -   
    -   
    +public abstract class AbstractPagedOutputView extends 
AbstractMemorySegmentOutputView {
    +
    +   // Can we just return the current segment when we get the next one or 
do we have to keep
    +   // them because {@link lock} was called. We need a counter because of 
nested locking.
    +   protected int lockCount;
    +
    +   // The segments that are currently locked. When {@link lock} is called 
we lock the current
    +   // segment and all the segments that we "advance" to. Only when {@link 
unlock} is called can we return
    +   // all of the segments but the most recent one.
    +   private List<MemorySegment> lockedSegments;
    +
    +   // Index of current segment in list of locked segments. When we are not 
locked this is set to 0, because
    +   // then we always only have one segment.
    +   private int currentSegmentIndex;
    +
        // 
--------------------------------------------------------------------------------------------
        //                                    Constructors
        // 
--------------------------------------------------------------------------------------------
    -   
    +
        /**
         * Creates a new output view that writes initially to the given initial 
segment. All segments in the
         * view have to be of the given {@code segmentSize}. A header of length 
{@code headerLength} is left
         * at the beginning of each segment.
    -    * 
    +    *
         * @param initialSegment The segment that the view starts writing to.
         * @param segmentSize The size of the memory segments.
         * @param headerLength The number of bytes to skip at the beginning of 
each segment for the header.
         */
        protected AbstractPagedOutputView(MemorySegment initialSegment, int 
segmentSize, int headerLength) {
    +           super(initialSegment, segmentSize, headerLength);
                if (initialSegment == null) {
                        throw new NullPointerException("Initial Segment may not 
be null");
                }
    -           this.segmentSize = segmentSize;
    -           this.headerLength = headerLength;
                this.currentSegment = initialSegment;
                this.positionInSegment = headerLength;
    +
    +           lockedSegments = new ArrayList<MemorySegment>();
    +
    +           // at the beginning we only have one segment
    +           currentSegmentIndex = 0;
    +           lockCount = 0;
        }
    -   
    +
        /**
         * @param segmentSize The size of the memory segments.
         * @param headerLength The number of bytes to skip at the beginning of 
each segment for the header.
         */
    -   protected AbstractPagedOutputView(int segmentSize, int headerLength)
    -   {
    -           this.segmentSize = segmentSize;
    -           this.headerLength = headerLength;
    +   protected AbstractPagedOutputView(int segmentSize, int headerLength) {
    +           super(segmentSize, headerLength);
    +
    +           lockedSegments = new ArrayList<MemorySegment>();
    +           currentSegmentIndex = 0;
    +           lockCount = 0;
        }
    -   
    +
     
        // 
--------------------------------------------------------------------------------------------
        //                                  Page Management
        // 
--------------------------------------------------------------------------------------------
    -   
    +
        /**
    -    * 
    +    *
         * This method must return a segment. If no more segments are 
available, it must throw an
         * {@link java.io.EOFException}.
    -    * 
    -    * @param current The current memory segment
    -    * @param positionInCurrent The position in the segment, one after the 
last valid byte.
    -    * @return The next memory segment. 
    -    * 
    -    * @throws IOException
    +    *
    +    * @return The next memory segment.
    +    *
    +    * @throws java.io.IOException
    +    */
    +
    +   protected abstract MemorySegment requestSegment() throws IOException;
    +
    +   /**
    +    *
    +    * This method returns a memory segment that has been filled to where 
it came from. Child classes
    +    * are responsible for handling the segments.
    +    *
    +    * @param segment The  memory segment
    +    * @param bytesWritten The position in the segment, one after the last 
valid byte.
    +    *
    +    * @throws java.io.IOException
         */
    -   protected abstract MemorySegment nextSegment(MemorySegment current, int 
positionInCurrent) throws IOException;
    -   
    -   
    +   protected abstract void returnSegment(MemorySegment segment, int 
bytesWritten) throws IOException;
    +
    +
        /**
         * Gets the segment that the view currently writes to.
    -    * 
    +    *
         * @return The segment the view currently writes to.
         */
        public MemorySegment getCurrentSegment() {
                return this.currentSegment;
        }
    -   
    +
        /**
         * Gets the current write position (the position where the next bytes 
will be written)
         * in the current memory segment.
    -    * 
    +    *
         * @return The current write offset in the current memory segment.
         */
        public int getCurrentPositionInSegment() {
                return this.positionInSegment;
        }
    -   
    +
        /**
         * Gets the size of the segments used by this view.
    -    * 
    +    *
         * @return The memory segment size.
         */
        public int getSegmentSize() {
                return this.segmentSize;
        }
    -   
    +
        /**
         * Moves the output view to the next page. This method invokes 
internally the
    -    * {@link #nextSegment(MemorySegment, int)} method to give the current 
memory segment to the concrete subclass' 
    -    * implementation and obtain the next segment to write to. Writing will 
continue inside the new segment
    -    * after the header.
    -    * 
    -    * @throws IOException Thrown, if the current segment could not be 
processed or a new segment could not
    -    *                     be obtained. 
    +    * {@link #requestSegment()} and {@link 
#returnSegment(org.apache.flink.core.memory.MemorySegment, int)} methods
    +    * to give the current memory segment to the concrete subclass' 
implementation and obtain the next segment to
    +    * write to. Writing will continue inside the new segment after the 
header.
    +    *
    +    * @throws java.io.IOException Thrown, if the current segment could not 
be processed or a new segment could not
    +    *                     be obtained.
         */
    -   protected void advance() throws IOException {
    -           this.currentSegment = nextSegment(this.currentSegment, 
this.positionInSegment);
    -           this.positionInSegment = this.headerLength;
    -   }
    -   
    -   /**
    -    * Sets the internal state to the given memory segment and the given 
position within the segment. 
    -    * 
    -    * @param seg The memory segment to write the next bytes to.
    -    * @param position The position to start writing the next bytes to.
    -    */
    -   protected void seekOutput(MemorySegment seg, int position) {
    -           this.currentSegment = seg;
    -           this.positionInSegment = position;
    -   }
    -   
    -   /**
    -    * Clears the internal state. Any successive write calls will fail 
until either {@link #advance()} or
    -    * {@link #seekOutput(MemorySegment, int)} is called. 
    -    * 
    -    * @see #advance()
    -    * @see #seekOutput(MemorySegment, int)
    -    */
    -   protected void clear() {
    -           this.currentSegment = null;
    -           this.positionInSegment = this.headerLength;
    -   }
    -   
    -   // 
--------------------------------------------------------------------------------------------
    -   //                               Data Output Specific methods
    -   // 
--------------------------------------------------------------------------------------------
    -   
    -   @Override
    -   public void write(int b) throws IOException {
    -           writeByte(b);
    -   }
    -
    -   @Override
    -   public void write(byte[] b) throws IOException {
    -           write(b, 0, b.length);
    -   }
    -
        @Override
    -   public void write(byte[] b, int off, int len) throws IOException {
    -           int remaining = this.segmentSize - this.positionInSegment;
    -           if (remaining >= len) {
    -                   this.currentSegment.put(this.positionInSegment, b, off, 
len);
    -                   this.positionInSegment += len;
    -           }
    -           else {
    -                   if (remaining == 0) {
    -                           advance();
    -                           remaining = this.segmentSize - 
this.positionInSegment;
    -                   }
    -                   while (true) {
    -                           int toPut = Math.min(remaining, len);
    -                           this.currentSegment.put(this.positionInSegment, 
b, off, toPut);
    -                           off += toPut;
    -                           len -= toPut;
    -                           
    -                           if (len > 0) {
    -                                   this.positionInSegment = 
this.segmentSize;
    -                                   advance();
    -                                   remaining = this.segmentSize - 
this.positionInSegment;  
    -                           }
    -                           else {
    -                                   this.positionInSegment += toPut;
    -                                   break;
    -                           }
    +   protected void advance() throws IOException {
    +           if (lockCount > 0) {
    +                   if (currentSegmentIndex < lockedSegments.size() - 1) {
    +                           currentSegmentIndex++;
    +                           currentSegment = 
lockedSegments.get(currentSegmentIndex);
    +                   } else {
    +                           currentSegmentIndex++;
    +                           currentSegment = requestSegment();
    +                           lockedSegments.add(currentSegment);
                        }
    +           } else {
    +                   returnSegment(currentSegment, positionInSegment);
    +                   currentSegment = requestSegment();
    +                   currentSegmentIndex = 0;
                }
    +           positionInSegment = headerLength;
        }
     
    -   @Override
    -   public void writeBoolean(boolean v) throws IOException {
    -           writeByte(v ? 1 : 0);
    -   }
    -
    -   @Override
    -   public void writeByte(int v) throws IOException {
    -           if (this.positionInSegment < this.segmentSize) {
    -                   this.currentSegment.put(this.positionInSegment++, 
(byte) v);
    -           }
    -           else {
    -                   advance();
    -                   writeByte(v);
    +   public void lock() {
    +           lockCount++;
    +           if (lockCount == 1) {
    +                   // we are the first to lock, so start the "locked 
segments" list
    +                   Preconditions.checkState(lockedSegments.isEmpty(), 
"List of locked segments must be empty.");
    +                   Preconditions.checkState(currentSegmentIndex == 0, 
"Before locking currentSegmentIndex must always be 0.");
    +                   lockedSegments.add(currentSegment);
                }
        }
     
    -   @Override
    -   public void writeShort(int v) throws IOException {
    -           if (this.positionInSegment < this.segmentSize - 1) {
    -                   this.currentSegment.putShort(this.positionInSegment, 
(short) v);
    -                   this.positionInSegment += 2;
    -           }
    -           else if (this.positionInSegment == this.segmentSize) {
    -                   advance();
    -                   writeShort(v);
    -           }
    -           else {
    -                   writeByte(v >> 8);
    -                   writeByte(v);
    -           }
    -   }
    -
    -   @Override
    -   public void writeChar(int v) throws IOException {
    -           if (this.positionInSegment < this.segmentSize - 1) {
    -                   this.currentSegment.putChar(this.positionInSegment, 
(char) v);
    -                   this.positionInSegment += 2;
    -           }
    -           else if (this.positionInSegment == this.segmentSize) {
    -                   advance();
    -                   writeChar(v);
    -           }
    -           else {
    -                   writeByte(v >> 8);
    -                   writeByte(v);
    -           }
    -   }
    -
    -   @Override
    -   public void writeInt(int v) throws IOException {
    -           if (this.positionInSegment < this.segmentSize - 3) {
    -                   
this.currentSegment.putIntBigEndian(this.positionInSegment, v);
    -                   this.positionInSegment += 4;
    -           }
    -           else if (this.positionInSegment == this.segmentSize) {
    -                   advance();
    -                   writeInt(v);
    -           }
    -           else {
    -                   writeByte(v >> 24);
    -                   writeByte(v >> 16);
    -                   writeByte(v >>  8);
    -                   writeByte(v);
    -           }
    -   }
    +   // here positions are relative to first locked segment
    +   public long tell() {
    +           Preconditions.checkState(lockCount > 0, "Target buffer must be 
locked.");
     
    -   @Override
    -   public void writeLong(long v) throws IOException {
    -           if (this.positionInSegment < this.segmentSize - 7) {
    -                   
this.currentSegment.putLongBigEndian(this.positionInSegment, v);
    -                   this.positionInSegment += 8;
    -           }
    -           else if (this.positionInSegment == this.segmentSize) {
    -                   advance();
    -                   writeLong(v);
    -           }
    -           else {
    -                   writeByte((int) (v >> 56));
    -                   writeByte((int) (v >> 48));
    -                   writeByte((int) (v >> 40));
    -                   writeByte((int) (v >> 32));
    -                   writeByte((int) (v >> 24));
    -                   writeByte((int) (v >> 16));
    -                   writeByte((int) (v >>  8));
    -                   writeByte((int) v);
    -           }
    -   }
    -
    -   @Override
    -   public void writeFloat(float v) throws IOException {
    -           writeInt(Float.floatToRawIntBits(v));
    -   }
    -
    -   @Override
    -   public void writeDouble(double v) throws IOException {
    -           writeLong(Double.doubleToRawLongBits(v));
    -   }
    -
    -   @Override
    -   public void writeBytes(String s) throws IOException {
    -           for (int i = 0; i < s.length(); i++) {
    -                   writeByte(s.charAt(i));
    -           }
    +           return currentSegmentIndex * segmentSize + positionInSegment;
        }
     
    -   @Override
    -   public void writeChars(String s) throws IOException {
    -           for (int i = 0; i < s.length(); i++) {
    -                   writeChar(s.charAt(i));
    -           }
    -   }
    +   public void seek(long position) throws IOException {
    +           Preconditions.checkState(lockCount > 0, "Target buffer must be 
locked.");
    +           Preconditions.checkArgument(position >= 0, "position must be 
positive");
    --- End diff --
    
    Changing it. Man @uce is strict... :smile: 


> Extend TypeSerializers and -Comparators to work directly on Memory Segments
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-987
>                 URL: https://issues.apache.org/jira/browse/FLINK-987
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>    Affects Versions: 0.6-incubating
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>             Fix For: 0.6-incubating
>
>
> As per discussion with [~till.rohrmann], [~uce], [~aljoscha], we suggest to 
> change the way that the TypeSerialzers/Comparators and 
> DataInputViews/DataOutputViews work.
> The goal is to allow more flexibility in the construction on the binary 
> representation of data types, and to allow partial deserialization of 
> individual fields. Both is currently prohibited by the fact that the 
> abstraction of the memory (into which the data goes) is a stream abstraction 
> ({{DataInputView}}, {{DataOutputView}}).
> An idea is to offer a random-access buffer like view for construction and 
> random-access deserialization, as well as various methods to copy elements in 
> a binary fashion between such buffers and streams.
> A possible set of methods for the {{TypeSerializer}} could be:
> {code}
> long serialize(T record, TargetBuffer buffer);
>       
> T deserialize(T reuse, SourceBuffer source);
>       
> void ensureBufferSufficientlyFilled(SourceBuffer source);
>       
> <X> X deserializeField(X reuse, int logicalPos, SourceBuffer buffer);
>       
> int getOffsetForField(int logicalPos, int offset, SourceBuffer buffer);
>       
> void copy(DataInputView in, TargetBuffer buffer);
>       
> void copy(SourceBuffer buffer,, DataOutputView out);
>       
> void copy(DataInputView source, DataOutputView target);
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to