[ https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420554#comment-15420554 ]
Guoqiang Li commented on SPARK-6235: ------------------------------------ [~hvanhovell] The main changes. 1. Replace DiskStore method {{def getBytes (blockId: BlockId): ChunkedByteBuffer}} to {{def getBlockData(blockId: BlockId): ManagedBuffer}}. 2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer. 3. Add Class Chunk Fetch InputStream, used for flow control and code as follows: {noformat} package org.apache.spark.network.client; import java.io.IOException; import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.util.Iterator; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import com.google.common.primitives.UnsignedBytes; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.buffer.ChunkedByteBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.protocol.StreamChunkId; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.network.util.TransportFrameDecoder; public class ChunkFetchInputStream extends InputStream { private final Logger logger = LoggerFactory.getLogger(ChunkFetchInputStream.class); private final TransportResponseHandler handler; private final Channel channel; private final StreamChunkId streamId; private final long byteCount; private final ChunkReceivedCallback callback; private final LinkedBlockingQueue<ByteBuf> buffers = new LinkedBlockingQueue<>(1024); public final TransportFrameDecoder.Interceptor interceptor; private ByteBuf curChunk; private boolean isCallbacked = false; private long writerIndex = 0; private final AtomicReference<Throwable> cause = new AtomicReference<>(null); private final AtomicBoolean isClosed = new AtomicBoolean(false); public ChunkFetchInputStream( TransportResponseHandler handler, Channel channel, StreamChunkId streamId, long byteCount, ChunkReceivedCallback callback) { this.handler = handler; this.channel = channel; this.streamId = streamId; this.byteCount = byteCount; this.callback = callback; this.interceptor = new StreamInterceptor(); } @Override public int read() throws IOException { if (isClosed.get()) return -1; pullChunk(); if (curChunk != null) { byte b = curChunk.readByte(); return UnsignedBytes.toInt(b); } else { return -1; } } @Override public int read(byte[] dest, int offset, int length) throws IOException { if (isClosed.get()) return -1; pullChunk(); if (curChunk != null) { int amountToGet = Math.min(curChunk.readableBytes(), length); curChunk.readBytes(dest, offset, amountToGet); return amountToGet; } else { return -1; } } @Override public long skip(long bytes) throws IOException { if (isClosed.get()) return 0L; pullChunk(); if (curChunk != null) { int amountToSkip = (int) Math.min(bytes, curChunk.readableBytes()); curChunk.skipBytes(amountToSkip); return amountToSkip; } else { return 0L; } } @Override public void close() throws IOException { if (!isClosed.get()) { releaseCurChunk(); isClosed.set(true); resetChannel(); Iterator<ByteBuf> itr = buffers.iterator(); while (itr.hasNext()) { itr.next().release(); } buffers.clear(); } } private void pullChunk() throws IOException { if (curChunk != null && !curChunk.isReadable()) releaseCurChunk(); if (curChunk == null && cause.get() == null && !isClosed.get()) { try { curChunk = buffers.take(); // if channel.read() will be not invoked automatically, // the method is called by here if (!channel.config().isAutoRead()) channel.read(); } catch (Throwable e) { setCause(e); } } if (cause.get() != null) throw new IOException(cause.get()); } private void setCause(Throwable e) { if (cause.get() == null) cause.set(e); } private void releaseCurChunk() { if (curChunk != null) { curChunk.release(); curChunk = null; } } private void onSuccess() throws IOException { if (isCallbacked) return; if (cause.get() != null) { callback.onFailure(streamId.chunkIndex, cause.get()); } else { InputStream inputStream = new LimitedInputStream(this, byteCount); ManagedBuffer managedBuffer = new InputStreamManagedBuffer(inputStream, byteCount); callback.onSuccess(streamId.chunkIndex, managedBuffer); } isCallbacked = true; } private void resetChannel() { if (!channel.config().isAutoRead()) { channel.config().setAutoRead(true); channel.read(); } } private class StreamInterceptor implements TransportFrameDecoder.Interceptor { @Override public void exceptionCaught(Throwable e) throws Exception { handler.deactivateStream(); setCause(e); logger.trace("exceptionCaught", e); onSuccess(); resetChannel(); } @Override public void channelInactive() throws Exception { handler.deactivateStream(); setCause(new ClosedChannelException()); logger.trace("channelInactive", cause.get()); onSuccess(); resetChannel(); } @Override public boolean handle(ByteBuf buf) throws Exception { try { ByteBuf frame = nextBufferForFrame(byteCount - writerIndex, buf); int available = frame.readableBytes(); writerIndex += available; mayTrafficSuspension(); if (!isClosed.get() && available > 0) { buffers.put(frame); if (writerIndex > byteCount) { setCause(new IllegalStateException(String.format( "Read too many bytes? Expected %d, but read %d.", byteCount, writerIndex))); handler.deactivateStream(); } else if (writerIndex == byteCount) { handler.deactivateStream(); } } else { frame.release(); } logger.trace(streamId + ", writerIndex " + writerIndex + " byteCount, " + byteCount); onSuccess(); } catch (Exception e) { setCause(e); resetChannel(); } return writerIndex != byteCount; } /** * Takes the first buffer in the internal list, and either adjust it to fit in the frame * (by taking a slice out of it) or remove it from the internal list. */ private ByteBuf nextBufferForFrame(long bytesToRead, ByteBuf buf) { int slen = (int) Math.min(buf.readableBytes(), bytesToRead); ByteBuf frame; if (slen == buf.readableBytes()) { frame = buf.retain().readSlice(slen); } else { frame = buf.alloc().buffer(slen); buf.readBytes(frame); frame.retain(); } return frame; } private void mayTrafficSuspension() { // If there is too much cached chunk, to manually call channel.read(). if (channel.config().isAutoRead() && buffers.size() > 31) { channel.config().setAutoRead(false); } if (writerIndex >= byteCount) resetChannel(); } } private class InputStreamManagedBuffer extends ManagedBuffer { private final InputStream inputStream; private final long byteCount; InputStreamManagedBuffer(InputStream inputStream, long byteCount) { this.inputStream = inputStream; this.byteCount = byteCount; } public long size() { return byteCount; } public ChunkedByteBuffer nioByteBuffer() throws IOException { throw new UnsupportedOperationException("nioByteBuffer"); } public InputStream createInputStream() throws IOException { return inputStream; } public ManagedBuffer retain() { // throw new UnsupportedOperationException("retain"); return this; } public ManagedBuffer release() { // throw new UnsupportedOperationException("release"); return this; } public Object convertToNetty() throws IOException { throw new UnsupportedOperationException("convertToNetty"); } } } {noformat} > Address various 2G limits > ------------------------- > > Key: SPARK-6235 > URL: https://issues.apache.org/jira/browse/SPARK-6235 > Project: Spark > Issue Type: Umbrella > Components: Shuffle, Spark Core > Reporter: Reynold Xin > > An umbrella ticket to track the various 2G limit we have in Spark, due to the > use of byte arrays and ByteBuffers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org