chaokunyang commented on code in PR #1483:
URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557454244


##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
 package org.apache.fury.io;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
 import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
 
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements 
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader, 
ReadableByteChannel {
   private final ReadableByteChannel channel;
-  private final ByteBuffer byteBuffer;
-  private final MemoryBuffer buffer;
+  private final MemoryBuffer memoryBuffer;
+  private ByteBuffer byteBuffer;
 
   public FuryReadableChannel(ReadableByteChannel channel) {
-    this(channel, ByteBuffer.allocate(4096));
+    this(channel, ByteBuffer.allocateDirect(4096));
   }
 
-  private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+  public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+    Preconditions.checkArgument(
+        directBuffer.isDirect(), "FuryReadableChannel support only direct 
ByteBuffer.");
     this.channel = channel;
     this.byteBuffer = directBuffer;
-    this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+    long offHeapAddress = Platform.getAddress(directBuffer) + 
directBuffer.position();
+    this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, 
this);
+  }
+
+  @Override
+  public int fillBuffer(int minFillSize) {
+    try {
+      ByteBuffer byteBuf = byteBuffer;
+      MemoryBuffer memoryBuf = memoryBuffer;
+      int position = byteBuf.position();
+      int newLimit = position + minFillSize;
+      if (newLimit > byteBuf.capacity()) {
+        int newSize =
+            newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) 
(newLimit * 1.5);
+        ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+        byteBuf.position(0);
+        newByteBuf.put(byteBuf);
+        byteBuf = byteBuffer = newByteBuf;
+        memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, 
byteBuf);
+      }
+      byteBuf.limit(newLimit);
+      int readCount = channel.read(byteBuf);
+      memoryBuf.increaseSize(readCount);
+      return readCount;
+    } catch (IOException e) {
+      throw new DeserializationException("Failed to read the provided byte 
channel", e);
+    }
   }
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
-    throw new UnsupportedEncodingException();
+    int dstRemaining = dst.remaining();
+    if (dstRemaining <= 0) {
+      return 0;
+    }
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining <= 0) {
+      return -1;
+    }
+    if (remaining >= dstRemaining) {
+      byte[] bytes = buf.readBytes(dstRemaining);
+      dst.put(bytes);
+      return dstRemaining;
+    } else {
+      int filledSize = fillBuffer(dstRemaining - remaining);
+      int length = remaining + filledSize;
+      byte[] bytes = buf.readBytes(length);
+      dst.put(bytes);
+      return length;
+    }
+  }
+
+  @Override
+  public void readTo(byte[] dst, int dstIndex, int length) {
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining >= length) {
+      buf.readBytes(dst, dstIndex, length);
+    } else {
+      buf.readBytes(dst, dstIndex, remaining);
+      try {
+        ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length 
- remaining);
+        channel.read(buffer);
+      } catch (IOException e) {
+        throw new DeserializationException("Failed to read the provided byte 
channel", e);
+      }
+    }
+  }
+
+  @Override
+  public void readToUnsafe(Object target, long targetPointer, int numBytes) {
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining < numBytes) {
+      fillBuffer(numBytes - remaining);
+    }
+    long address = buf.getUnsafeReaderAddress();
+    Platform.copyMemory(null, address, target, targetPointer, numBytes);
+    buf.increaseReaderIndex(numBytes);
+  }
+
+  @Override
+  public void readToByteBuffer(ByteBuffer dst, int length) {
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining < length) {
+      remaining += fillBuffer(length - remaining);

Review Comment:
   This introduce an extra copy, the data doesn't have to copy into 
`memoryBuffer` to read into `dst`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org

Reply via email to