Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang merged PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r189274


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   I tried make boundchecker as a singleton, but then is can't access the index 
in memory buffer. Since memory buffer won't be created frequently. The object 
creation cost should be acceptable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


Munoon commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r133195


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   Yeah, sound reasonable. I just trying to find a way to avoid unnecessary 
object allocation, but can't find any better solution...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555453677


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   Could you elaborate more how `MemoryBuffer` read from the 
`InputStream`/`Channel`? There still needs to be an interface for such cases. 
That's what `AbstractStreamReader` is used for here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555455805


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   Note: we can't make `MemoryBuffer` as a polymorphic class, it will make a 
large degeneration to performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555453677


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   Could you elaborate more how `MemoryBuffer` read from the 
`InputStream`/`Channel`? There still a interface for such cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555453677


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   Could you elaborate more how `MemoryBuffer` read from the 
`InputStream`/`Channel`? There still a interface for such cases. That's what 
`AbstractStreamReader` is used for here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


Munoon commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555436359


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -100,24 +105,30 @@ public final class MemoryBuffer {
* @param length buffer size
*/
   private MemoryBuffer(byte[] buffer, int offset, int length) {
-Preconditions.checkArgument(offset >= 0 && length >= 0);
+this(buffer, offset, length, null);
+  }
+
+  /**
+   * Creates a new memory buffer that represents the memory of the byte array.
+   *
+   * @param buffer The byte array whose memory is represented by this memory 
buffer.
+   * @param offset The offset of the sub array to be used; must be 
non-negative and no larger than
+   * array.length.
+   * @param length buffer size
+   * @param streamReader a reader for reading from a stream.
+   */
+  private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader 
streamReader) {
+checkArgument(offset >= 0 && length >= 0);
 if (offset + length > buffer.length) {
   throw new IllegalArgumentException(
   String.format("%d exceeds buffer size %d", offset + length, 
buffer.length));
 }
 initHeapBuffer(buffer, offset, length);
-  }
-
-  private void initHeapBuffer(byte[] buffer, int offset, int length) {
-if (buffer == null) {
-  throw new NullPointerException("buffer");
+if (streamReader != null) {
+  this.streamReader = streamReader;
+} else {
+  this.streamReader = new BoundChecker();

Review Comment:
   Proposal: maybe instead of `BoundChecker` class we can make `MemoryBuffer` 
class itself extending `AbstractStreamReader`? In this case we can avoid new 
class initialization by simply passing `this` to the `streamReader` field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-08 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2042137153

   @theweipeng @Munoon @PragmaTwice @LiangliangSui If there are no further 
concerns, I will merge this PR by the end of this day


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554889981


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -90,6 +92,7 @@ public final class MemoryBuffer {
   private int size;

Review Comment:
   Changing this name makes this PR a little messy, how about renaming it in 
another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555011188


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() {
 return arr;
   }
 
-  public byte[] readBytesAlignedSizeEmbedded() {
+  public byte[] readBytesWithAlignedSize() {
 final int numBytes = readPositiveAlignedVarInt();
 int readerIdx = readerIndex;
+final byte[] arr = new byte[numBytes];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIndex(%d) + length(%d) exceeds size(%d): %s",
-  readerIdx, numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readTo(arr, 0, numBytes);
+  return arr;
 }
-final byte[] arr = new byte[numBytes];
 Platform.UNSAFE.copyMemory(
 this.heapMemory, this.address + readerIdx, arr, 
Platform.BYTE_ARRAY_OFFSET, numBytes);
 readerIndex = readerIdx + numBytes;
 return arr;
   }
 
-  /**
-   * This method should be used to read data written by {@link
-   * #writePrimitiveArrayWithSizeEmbedded}.
-   */
-  public char[] readCharsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  /** This method should be used to read data written by {@link 
#writePrimitiveArrayWithSize}. */
+  public char[] readChars(int numBytes) {
 int readerIdx = readerIndex;
+final char[] chars = new char[numBytes >> 1];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, 0, numBytes);
+  return chars;
 }
-final char[] chars = new char[numBytes / 2];
 Platform.copyMemory(
 heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return chars;
   }
 
-  public char[] readCharsAlignedSizeEmbedded() {
-final int numBytes = readPositiveAlignedVarInt();
+  public void readChars(char[] chars, int offset, int numBytes) {
 final int readerIdx = readerIndex;
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, offset, numBytes);
+  return;
 }
-final char[] chars = new char[numBytes / 2];
-Platform.copyMemory(
-heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
+Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, 
numBytes);
 readerIndex = readerIdx + numBytes;
-return chars;
   }
 
-  public long[] readLongsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  public char[] readCharsWithAlignedSize() {
+final int numBytes = readPositiveAlignedVarInt();
+return readChars(numBytes);
+  }
+
+  public long[] readLongs(int numBytes) {
 int readerIdx = readerIndex;
+int numElements = numBytes >> 3;
+final long[] longs = new long[numElements];
 // use subtract to avoid overflow
 if (readerIdx > size - numBytes) {
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  streamReader.readToUnsafe(longs, 0, numElements);
+  return longs;
 }
-final long[] longs = new long[numBytes / 8];
 Platform.copyMemory(
 heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return longs;
   }
 
-  public void readChars(char[] chars, int offset, int numBytes) {
-final int readerIdx = readerIndex;
-// use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  /**
+   * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object 
and pointer. NOTE: This
+   * is a unsafe method, no check here, please be carefully.
+   */
+  public void readToUnsafe(Object target, long targetPointer, int numBytes) {
+int remaining = size - readerIndex;
+if (numBytes > remaining) {
+  streamReader.readToUnsafe(target, targetPointer, num

Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554991906


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() {
 return arr;
   }
 
-  public byte[] readBytesAlignedSizeEmbedded() {
+  public byte[] readBytesWithAlignedSize() {
 final int numBytes = readPositiveAlignedVarInt();
 int readerIdx = readerIndex;
+final byte[] arr = new byte[numBytes];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIndex(%d) + length(%d) exceeds size(%d): %s",
-  readerIdx, numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readTo(arr, 0, numBytes);
+  return arr;
 }
-final byte[] arr = new byte[numBytes];
 Platform.UNSAFE.copyMemory(
 this.heapMemory, this.address + readerIdx, arr, 
Platform.BYTE_ARRAY_OFFSET, numBytes);
 readerIndex = readerIdx + numBytes;
 return arr;
   }
 
-  /**
-   * This method should be used to read data written by {@link
-   * #writePrimitiveArrayWithSizeEmbedded}.
-   */
-  public char[] readCharsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  /** This method should be used to read data written by {@link 
#writePrimitiveArrayWithSize}. */
+  public char[] readChars(int numBytes) {
 int readerIdx = readerIndex;
+final char[] chars = new char[numBytes >> 1];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, 0, numBytes);
+  return chars;
 }
-final char[] chars = new char[numBytes / 2];
 Platform.copyMemory(
 heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return chars;
   }
 
-  public char[] readCharsAlignedSizeEmbedded() {
-final int numBytes = readPositiveAlignedVarInt();
+  public void readChars(char[] chars, int offset, int numBytes) {
 final int readerIdx = readerIndex;
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, offset, numBytes);
+  return;
 }
-final char[] chars = new char[numBytes / 2];
-Platform.copyMemory(
-heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
+Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, 
numBytes);
 readerIndex = readerIdx + numBytes;
-return chars;
   }
 
-  public long[] readLongsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  public char[] readCharsWithAlignedSize() {
+final int numBytes = readPositiveAlignedVarInt();
+return readChars(numBytes);
+  }
+
+  public long[] readLongs(int numBytes) {
 int readerIdx = readerIndex;
+int numElements = numBytes >> 3;
+final long[] longs = new long[numElements];
 // use subtract to avoid overflow
 if (readerIdx > size - numBytes) {
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  streamReader.readToUnsafe(longs, 0, numElements);
+  return longs;
 }
-final long[] longs = new long[numBytes / 8];
 Platform.copyMemory(
 heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return longs;
   }
 
-  public void readChars(char[] chars, int offset, int numBytes) {
-final int readerIdx = readerIndex;
-// use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  /**
+   * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object 
and pointer. NOTE: This
+   * is a unsafe method, no check here, please be carefully.
+   */
+  public void readToUnsafe(Object target, long targetPointer, int numBytes) {
+int remaining = size - readerIndex;
+if (numBytes > remaining) {
+  streamReader.readToUnsafe(target, targetPointer, num

Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554981574


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() {
 return arr;
   }
 
-  public byte[] readBytesAlignedSizeEmbedded() {
+  public byte[] readBytesWithAlignedSize() {
 final int numBytes = readPositiveAlignedVarInt();
 int readerIdx = readerIndex;
+final byte[] arr = new byte[numBytes];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIndex(%d) + length(%d) exceeds size(%d): %s",
-  readerIdx, numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readTo(arr, 0, numBytes);
+  return arr;
 }
-final byte[] arr = new byte[numBytes];
 Platform.UNSAFE.copyMemory(
 this.heapMemory, this.address + readerIdx, arr, 
Platform.BYTE_ARRAY_OFFSET, numBytes);
 readerIndex = readerIdx + numBytes;
 return arr;
   }
 
-  /**
-   * This method should be used to read data written by {@link
-   * #writePrimitiveArrayWithSizeEmbedded}.
-   */
-  public char[] readCharsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  /** This method should be used to read data written by {@link 
#writePrimitiveArrayWithSize}. */
+  public char[] readChars(int numBytes) {
 int readerIdx = readerIndex;
+final char[] chars = new char[numBytes >> 1];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, 0, numBytes);
+  return chars;
 }
-final char[] chars = new char[numBytes / 2];
 Platform.copyMemory(
 heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return chars;
   }
 
-  public char[] readCharsAlignedSizeEmbedded() {
-final int numBytes = readPositiveAlignedVarInt();
+  public void readChars(char[] chars, int offset, int numBytes) {
 final int readerIdx = readerIndex;
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, offset, numBytes);
+  return;
 }
-final char[] chars = new char[numBytes / 2];
-Platform.copyMemory(
-heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
+Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, 
numBytes);
 readerIndex = readerIdx + numBytes;
-return chars;
   }
 
-  public long[] readLongsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  public char[] readCharsWithAlignedSize() {
+final int numBytes = readPositiveAlignedVarInt();
+return readChars(numBytes);
+  }
+
+  public long[] readLongs(int numBytes) {
 int readerIdx = readerIndex;
+int numElements = numBytes >> 3;
+final long[] longs = new long[numElements];
 // use subtract to avoid overflow
 if (readerIdx > size - numBytes) {
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  streamReader.readToUnsafe(longs, 0, numElements);
+  return longs;
 }
-final long[] longs = new long[numBytes / 8];
 Platform.copyMemory(
 heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return longs;
   }
 
-  public void readChars(char[] chars, int offset, int numBytes) {
-final int readerIdx = readerIndex;
-// use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  /**
+   * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object 
and pointer. NOTE: This
+   * is a unsafe method, no check here, please be carefully.
+   */
+  public void readToUnsafe(Object target, long targetPointer, int numBytes) {
+int remaining = size - readerIndex;
+if (numBytes > remaining) {
+  streamReader.readToUnsafe(target, targetPointer, num

Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


Munoon commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554977142


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() {
 return arr;
   }
 
-  public byte[] readBytesAlignedSizeEmbedded() {
+  public byte[] readBytesWithAlignedSize() {
 final int numBytes = readPositiveAlignedVarInt();
 int readerIdx = readerIndex;
+final byte[] arr = new byte[numBytes];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIndex(%d) + length(%d) exceeds size(%d): %s",
-  readerIdx, numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readTo(arr, 0, numBytes);
+  return arr;
 }
-final byte[] arr = new byte[numBytes];
 Platform.UNSAFE.copyMemory(
 this.heapMemory, this.address + readerIdx, arr, 
Platform.BYTE_ARRAY_OFFSET, numBytes);
 readerIndex = readerIdx + numBytes;
 return arr;
   }
 
-  /**
-   * This method should be used to read data written by {@link
-   * #writePrimitiveArrayWithSizeEmbedded}.
-   */
-  public char[] readCharsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  /** This method should be used to read data written by {@link 
#writePrimitiveArrayWithSize}. */
+  public char[] readChars(int numBytes) {
 int readerIdx = readerIndex;
+final char[] chars = new char[numBytes >> 1];
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, 0, numBytes);
+  return chars;
 }
-final char[] chars = new char[numBytes / 2];
 Platform.copyMemory(
 heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return chars;
   }
 
-  public char[] readCharsAlignedSizeEmbedded() {
-final int numBytes = readPositiveAlignedVarInt();
+  public void readChars(char[] chars, int offset, int numBytes) {
 final int readerIdx = readerIndex;
 // use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+if (readerIdx > size - numBytes) {
+  streamReader.readToUnsafe(chars, offset, numBytes);
+  return;
 }
-final char[] chars = new char[numBytes / 2];
-Platform.copyMemory(
-heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, 
numBytes);
+Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, 
numBytes);
 readerIndex = readerIdx + numBytes;
-return chars;
   }
 
-  public long[] readLongsWithSizeEmbedded() {
-final int numBytes = readPositiveVarInt();
+  public char[] readCharsWithAlignedSize() {
+final int numBytes = readPositiveAlignedVarInt();
+return readChars(numBytes);
+  }
+
+  public long[] readLongs(int numBytes) {
 int readerIdx = readerIndex;
+int numElements = numBytes >> 3;
+final long[] longs = new long[numElements];
 // use subtract to avoid overflow
 if (readerIdx > size - numBytes) {
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  streamReader.readToUnsafe(longs, 0, numElements);
+  return longs;
 }
-final long[] longs = new long[numBytes / 8];
 Platform.copyMemory(
 heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, 
numBytes);
 readerIndex = readerIdx + numBytes;
 return longs;
   }
 
-  public void readChars(char[] chars, int offset, int numBytes) {
-final int readerIdx = readerIndex;
-// use subtract to avoid overflow
-if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) 
{
-  throw new IndexOutOfBoundsException(
-  String.format(
-  "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, 
numBytes, size, this));
+  /**
+   * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object 
and pointer. NOTE: This
+   * is a unsafe method, no check here, please be carefully.
+   */
+  public void readToUnsafe(Object target, long targetPointer, int numBytes) {
+int remaining = size - readerIndex;
+if (numBytes > remaining) {
+  streamReader.readToUnsafe(target, targetPointer, numBytes

Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2041437927

   Hi @Munoon @LiangliangSui @theweipeng @PragmaTwice , this pr is ready for 
review, please take a look at it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-07 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554889981


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -90,6 +92,7 @@ public final class MemoryBuffer {
   private int size;

Review Comment:
   Change this make this PR a little messy, how about renaming it in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-06 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2041128594

   New readBytesString are too large for inline:
   
![image](https://github.com/apache/incubator-fury/assets/12445254/e76ccf29-a30f-4eab-8b16-c817d553dd05)
   
   Old version:
   
![image](https://github.com/apache/incubator-fury/assets/12445254/458406d6-7a14-4de1-bd7c-fd698865ba67)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-06 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2041104708

   Method spilit threshold:
   - 15
   ```
   Benchmark  (bufferType)  (objectType)  
(references)   Mode  CntScoreError  Units
   UserTypeDeserializeSuite.fury_deserialize arraySTRUCT
 false  thrpt   10  4333464.647 ± 247025.147  ops/s
   UserTypeDeserializeSuite.fury_deserialize arraySTRUCT
  true  thrpt   10  3643508.209 ± 470967.165  ops/s
   UserTypeDeserializeSuite.fury_deserialize  directBufferSTRUCT
 false  thrpt   10  3907185.065 ± 267600.435  ops/s
   UserTypeDeserializeSuite.fury_deserialize  directBufferSTRUCT
  true  thrpt   10  3961404.504 ± 415918.311  ops/s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-04 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2037714411

   > This PR introduce 25% performance degeneration, we can''t merge it
   
   I managed to make it faster than version without streaming reading. I'll 
split those optimization into other PR, then we can merge this PR later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2036205425

   This PR introduce 25% performance degeneration, we can''t merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550850495


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -90,6 +92,7 @@ public final class MemoryBuffer {
   private int size;

Review Comment:
   Make sense, I''ll update it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


Munoon commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550233226


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -90,6 +92,7 @@ public final class MemoryBuffer {
   private int size;

Review Comment:
   Maybe `limit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550230543


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -90,6 +92,7 @@ public final class MemoryBuffer {
   private int size;

Review Comment:
   Do you have any suggestions? I think size is still fine, 
bytearrayOutputStream use count, ByteBuffer use capacity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


Munoon commented on code in PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550066465


##
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java:
##
@@ -90,6 +92,7 @@ public final class MemoryBuffer {
   private int size;

Review Comment:
   Since the `size` field did not represent actual buffer _size_, maybe it 
worth renaming it? WDYT @chaokunyang?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034915355

   > > Hi @Munoon, I implemented streaming deserialization for input stream. 
Maybe it can be taken as an example for java.nio.Channel. The implementation is 
a littl tricky, maybe there is a better way.
   > 
   > Hi, @chaokunyang I feel like I'm ready with my PR, but I'm not sure how to 
send it, since it is built on top of your PR. I think that I will just wait 
until this PR will be merged. Let me know if you have a better idea.
   > 
   > Also, some methods of my implementation are fully copied from your 
`FuryInputStream` and some are partially. I think those can be extracted in the 
abstract class, but I'll leave it as is to make a PR simpler. Maybe it worth 
making a refactoring in a separate PR.
   
   Yep, we can wait this PR merged, than you can filing a new PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034897915

   I think it's important to our users, so I plan to include this PR in our 
0.5.0 release. The original streaming is error-prone when using, and not a real 
streaming implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034895369

   Hi @Munoon @LiangliangSui @knutwannheden @PragmaTwice This PR is ready to 
review, could you help review it?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034892088

   
   > ```java
   >   public void increaseReaderIndex(int diff) {
   > int readerIdx = readerIndex + diff;
   > if (readerIdx < 0) {
   >   throw new IndexOutOfBoundsException(
   >   String.format(
   >   "readerIndex: %d (expected: 0 <= readerIndex <= size(%d))", 
readerIdx, size));
   > } else if (readerIdx > size) {
   >   int readCount = streamReader.fillBuffer(diff);
   >   if (diff > readCount) {
   > throw new IndexOutOfBoundsException(
   > String.format(
   > "readerIndex: %d (expected: 0 <= readerIndex <= 
size(%d))", readerIdx, size));
   >   }
   > }
   > this.readerIndex = readerIdx;
   >   }
   > ```
   
   Nice catch, I fixed it, but fill buffer by 
`streamReader.fillBuffer(readerIdx - size);`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


Munoon commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034646795

   Oh, and by the way, I've also received an exception when testing my channel 
in `MemoryBuffer#increaseReaderIndex` method. I guess, current implementation 
in this PR is incorrect, since the `MemoryBuffer#size` property didn't 
represent actual size of this buffer no more.
   
   I've fixed this method as follow:
   ```java
 public void increaseReaderIndex(int diff) {
   int readerIdx = readerIndex + diff;
   if (readerIdx < 0) {
 throw new IndexOutOfBoundsException(
 String.format(
 "readerIndex: %d (expected: 0 <= readerIndex <= size(%d))", 
readerIdx, size));
   } else if (readerIdx > size) {
 int readCount = streamReader.fillBuffer(diff);
 if (diff > readCount) {
   throw new IndexOutOfBoundsException(
   String.format(
   "readerIndex: %d (expected: 0 <= readerIndex <= 
size(%d))", readerIdx, size));
 }
   }
   this.readerIndex = readerIdx;
 }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


Munoon commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034623840

   > Hi @Munoon, I implemented streaming deserialization for input stream. 
Maybe it can be taken as an example for java.nio.Channel. The implementation is 
a littl tricky, maybe there is a better way.
   
   Hi, @chaokunyang 
   I feel like I'm ready with my PR, but I'm not sure how to send it, since it 
is built on top of your PR. I think that I will just wait until this PR will be 
merged. Let me know if you have a better idea.
   
   Also, some methods of my implementation are fully copied from your 
`FuryInputStream` and some are partially. I think those can be extracted in the 
abstract class, but I'll leave it as is to make a PR simpler. Maybe it worth 
making a refactoring in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org



Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]

2024-04-03 Thread via GitHub


chaokunyang commented on PR #1451:
URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034334074

   Hi @Munoon, I implemented streaming deserialization for input stream. Maybe 
it can be taken as an example for java.nio.Channel. The implementation is a 
littl tricky, maybe there is a better way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org