chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557454244
########## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ########## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { - this(channel, ByteBuffer.allocate(4096)); + this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + Preconditions.checkArgument( + directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; - this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + + long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); + this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { + try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { + int newSize = + newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); + byteBuf.position(0); + newByteBuf.put(byteBuf); + byteBuf = byteBuffer = newByteBuf; + memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; + } catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); + } } @Override public int read(ByteBuffer dst) throws IOException { - throw new UnsupportedEncodingException(); + int dstRemaining = dst.remaining(); + if (dstRemaining <= 0) { + return 0; + } + MemoryBuffer buf = memoryBuffer; + int remaining = buf.remaining(); + if (remaining <= 0) { + return -1; + } + if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; + } else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; + } + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { + MemoryBuffer buf = memoryBuffer; + int remaining = buf.remaining(); + if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); + } else { + buf.readBytes(dst, dstIndex, remaining); + try { + ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); + channel.read(buffer); + } catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); + } + } + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { + MemoryBuffer buf = memoryBuffer; + int remaining = buf.remaining(); + if (remaining < numBytes) { + fillBuffer(numBytes - remaining); + } + long address = buf.getUnsafeReaderAddress(); + Platform.copyMemory(null, address, target, targetPointer, numBytes); + buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { + MemoryBuffer buf = memoryBuffer; + int remaining = buf.remaining(); + if (remaining < length) { + remaining += fillBuffer(length - remaining); Review Comment: This introduce an extra copy, the data doesn't have to copy into `memoryBuffer` to read into `dst` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org