[ 
https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420554#comment-15420554
 ] 

Guoqiang Li commented on SPARK-6235:
------------------------------------

[~hvanhovell]
The main changes.

1. Replace DiskStore method {{def getBytes (blockId: BlockId): 
ChunkedByteBuffer}} to {{def getBlockData(blockId: BlockId): ManagedBuffer}}.

2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer.

3. Add Class Chunk Fetch InputStream, used for flow control and code as follows:

{noformat}

package org.apache.spark.network.client;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.primitives.UnsignedBytes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ChunkedByteBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportFrameDecoder;

public class ChunkFetchInputStream extends InputStream {
  private final Logger logger = 
LoggerFactory.getLogger(ChunkFetchInputStream.class);

  private final TransportResponseHandler handler;
  private final Channel channel;
  private final StreamChunkId streamId;
  private final long byteCount;
  private final ChunkReceivedCallback callback;
  private final LinkedBlockingQueue<ByteBuf> buffers = new 
LinkedBlockingQueue<>(1024);
  public final TransportFrameDecoder.Interceptor interceptor;

  private ByteBuf curChunk;
  private boolean isCallbacked = false;
  private long writerIndex = 0;

  private final AtomicReference<Throwable> cause = new AtomicReference<>(null);
  private final AtomicBoolean isClosed = new AtomicBoolean(false);

  public ChunkFetchInputStream(
      TransportResponseHandler handler,
      Channel channel,
      StreamChunkId streamId,
      long byteCount,
      ChunkReceivedCallback callback) {
    this.handler = handler;
    this.channel = channel;
    this.streamId = streamId;
    this.byteCount = byteCount;
    this.callback = callback;
    this.interceptor = new StreamInterceptor();
  }

  @Override
  public int read() throws IOException {
    if (isClosed.get()) return -1;
    pullChunk();
    if (curChunk != null) {
      byte b = curChunk.readByte();
      return UnsignedBytes.toInt(b);
    } else {
      return -1;
    }
  }

  @Override
  public int read(byte[] dest, int offset, int length) throws IOException {
    if (isClosed.get()) return -1;
    pullChunk();
    if (curChunk != null) {
      int amountToGet = Math.min(curChunk.readableBytes(), length);
      curChunk.readBytes(dest, offset, amountToGet);
      return amountToGet;
    } else {
      return -1;
    }
  }

  @Override
  public long skip(long bytes) throws IOException {
    if (isClosed.get()) return 0L;
    pullChunk();
    if (curChunk != null) {
      int amountToSkip = (int) Math.min(bytes, curChunk.readableBytes());
      curChunk.skipBytes(amountToSkip);
      return amountToSkip;
    } else {
      return 0L;
    }
  }

  @Override
  public void close() throws IOException {
    if (!isClosed.get()) {
      releaseCurChunk();
      isClosed.set(true);
      resetChannel();
      Iterator<ByteBuf> itr = buffers.iterator();
      while (itr.hasNext()) {
        itr.next().release();
      }
      buffers.clear();
    }
  }

  private void pullChunk() throws IOException {
    if (curChunk != null && !curChunk.isReadable()) releaseCurChunk();
    if (curChunk == null && cause.get() == null && !isClosed.get()) {
      try {
        curChunk = buffers.take();
        // if channel.read() will be not invoked automatically,
        // the method is called by here
        if (!channel.config().isAutoRead()) channel.read();
      } catch (Throwable e) {
        setCause(e);
      }
    }
    if (cause.get() != null) throw new IOException(cause.get());
  }

  private void setCause(Throwable e) {
    if (cause.get() == null) cause.set(e);
  }

  private void releaseCurChunk() {
    if (curChunk != null) {
      curChunk.release();
      curChunk = null;
    }
  }

  private void onSuccess() throws IOException {
    if (isCallbacked) return;
    if (cause.get() != null) {
      callback.onFailure(streamId.chunkIndex, cause.get());
    } else {
      InputStream inputStream = new LimitedInputStream(this, byteCount);
      ManagedBuffer managedBuffer = new InputStreamManagedBuffer(inputStream, 
byteCount);
      callback.onSuccess(streamId.chunkIndex, managedBuffer);
    }
    isCallbacked = true;
  }

  private void resetChannel() {
    if (!channel.config().isAutoRead()) {
      channel.config().setAutoRead(true);
      channel.read();
    }
  }

  private class StreamInterceptor implements TransportFrameDecoder.Interceptor {
    @Override
    public void exceptionCaught(Throwable e) throws Exception {
      handler.deactivateStream();
      setCause(e);
      logger.trace("exceptionCaught", e);
      onSuccess();
      resetChannel();
    }

    @Override
    public void channelInactive() throws Exception {
      handler.deactivateStream();
      setCause(new ClosedChannelException());
      logger.trace("channelInactive", cause.get());
      onSuccess();
      resetChannel();
    }

    @Override
    public boolean handle(ByteBuf buf) throws Exception {
      try {
        ByteBuf frame = nextBufferForFrame(byteCount - writerIndex, buf);
        int available = frame.readableBytes();
        writerIndex += available;
        mayTrafficSuspension();
        if (!isClosed.get() && available > 0) {
          buffers.put(frame);
          if (writerIndex > byteCount) {
            setCause(new IllegalStateException(String.format(
                "Read too many bytes? Expected %d, but read %d.", byteCount, 
writerIndex)));
            handler.deactivateStream();
          } else if (writerIndex == byteCount) {
            handler.deactivateStream();
          }
        } else {
          frame.release();
        }
        logger.trace(streamId + ", writerIndex  " + writerIndex + " byteCount, 
" + byteCount);
        onSuccess();
      } catch (Exception e) {
        setCause(e);
        resetChannel();
      }
      return writerIndex != byteCount;
    }

    /**
     * Takes the first buffer in the internal list, and either adjust it to fit 
in the frame
     * (by taking a slice out of it) or remove it from the internal list.
     */
    private ByteBuf nextBufferForFrame(long bytesToRead, ByteBuf buf) {
      int slen = (int) Math.min(buf.readableBytes(), bytesToRead);
      ByteBuf frame;
      if (slen == buf.readableBytes()) {
        frame = buf.retain().readSlice(slen);
      } else {
        frame = buf.alloc().buffer(slen);
        buf.readBytes(frame);
        frame.retain();
      }
      return frame;
    }

    private void mayTrafficSuspension() {
      // If there is too much cached chunk, to manually call channel.read().
      if (channel.config().isAutoRead() && buffers.size() > 31) {
        channel.config().setAutoRead(false);
      }
      if (writerIndex >= byteCount) resetChannel();
    }
  }

  private class InputStreamManagedBuffer extends ManagedBuffer {
    private final InputStream inputStream;
    private final long byteCount;

    InputStreamManagedBuffer(InputStream inputStream, long byteCount) {
      this.inputStream = inputStream;
      this.byteCount = byteCount;
    }

    public long size() {
      return byteCount;
    }

    public ChunkedByteBuffer nioByteBuffer() throws IOException {
      throw new UnsupportedOperationException("nioByteBuffer");
    }

    public InputStream createInputStream() throws IOException {
      return inputStream;
    }

    public ManagedBuffer retain() {
      // throw new UnsupportedOperationException("retain");
      return this;
    }

    public ManagedBuffer release() {
      // throw new UnsupportedOperationException("release");
      return this;
    }

    public Object convertToNetty() throws IOException {
      throw new UnsupportedOperationException("convertToNetty");
    }
  }
}

{noformat}

> Address various 2G limits
> -------------------------
>
>                 Key: SPARK-6235
>                 URL: https://issues.apache.org/jira/browse/SPARK-6235
>             Project: Spark
>          Issue Type: Umbrella
>          Components: Shuffle, Spark Core
>            Reporter: Reynold Xin
>
> An umbrella ticket to track the various 2G limit we have in Spark, due to the 
> use of byte arrays and ByteBuffers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to