Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang merged PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r189274 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: I tried make boundchecker as a singleton, but then is can't access the index in memory buffer. Since memory buffer won't be created frequently. The object creation cost should be acceptable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r133195 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: Yeah, sound reasonable. I just trying to find a way to avoid unnecessary object allocation, but can't find any better solution... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555453677 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: Could you elaborate more how `MemoryBuffer` read from the `InputStream`/`Channel`? There still needs to be an interface for such cases. That's what `AbstractStreamReader` is used for here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555455805 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: Note: we can't make `MemoryBuffer` as a polymorphic class, it will make a large degeneration to performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555453677 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: Could you elaborate more how `MemoryBuffer` read from the `InputStream`/`Channel`? There still a interface for such cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555453677 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: Could you elaborate more how `MemoryBuffer` read from the `InputStream`/`Channel`? There still a interface for such cases. That's what `AbstractStreamReader` is used for here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555436359 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -100,24 +105,30 @@ public final class MemoryBuffer { * @param length buffer size */ private MemoryBuffer(byte[] buffer, int offset, int length) { -Preconditions.checkArgument(offset >= 0 && length >= 0); +this(buffer, offset, length, null); + } + + /** + * Creates a new memory buffer that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory buffer. + * @param offset The offset of the sub array to be used; must be non-negative and no larger than + * array.length. + * @param length buffer size + * @param streamReader a reader for reading from a stream. + */ + private MemoryBuffer(byte[] buffer, int offset, int length, FuryStreamReader streamReader) { +checkArgument(offset >= 0 && length >= 0); if (offset + length > buffer.length) { throw new IllegalArgumentException( String.format("%d exceeds buffer size %d", offset + length, buffer.length)); } initHeapBuffer(buffer, offset, length); - } - - private void initHeapBuffer(byte[] buffer, int offset, int length) { -if (buffer == null) { - throw new NullPointerException("buffer"); +if (streamReader != null) { + this.streamReader = streamReader; +} else { + this.streamReader = new BoundChecker(); Review Comment: Proposal: maybe instead of `BoundChecker` class we can make `MemoryBuffer` class itself extending `AbstractStreamReader`? In this case we can avoid new class initialization by simply passing `this` to the `streamReader` field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2042137153 @theweipeng @Munoon @PragmaTwice @LiangliangSui If there are no further concerns, I will merge this PR by the end of this day -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554889981 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -90,6 +92,7 @@ public final class MemoryBuffer { private int size; Review Comment: Changing this name makes this PR a little messy, how about renaming it in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1555011188 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() { return arr; } - public byte[] readBytesAlignedSizeEmbedded() { + public byte[] readBytesWithAlignedSize() { final int numBytes = readPositiveAlignedVarInt(); int readerIdx = readerIndex; +final byte[] arr = new byte[numBytes]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIndex(%d) + length(%d) exceeds size(%d): %s", - readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readTo(arr, 0, numBytes); + return arr; } -final byte[] arr = new byte[numBytes]; Platform.UNSAFE.copyMemory( this.heapMemory, this.address + readerIdx, arr, Platform.BYTE_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return arr; } - /** - * This method should be used to read data written by {@link - * #writePrimitiveArrayWithSizeEmbedded}. - */ - public char[] readCharsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + /** This method should be used to read data written by {@link #writePrimitiveArrayWithSize}. */ + public char[] readChars(int numBytes) { int readerIdx = readerIndex; +final char[] chars = new char[numBytes >> 1]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, 0, numBytes); + return chars; } -final char[] chars = new char[numBytes / 2]; Platform.copyMemory( heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return chars; } - public char[] readCharsAlignedSizeEmbedded() { -final int numBytes = readPositiveAlignedVarInt(); + public void readChars(char[] chars, int offset, int numBytes) { final int readerIdx = readerIndex; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, offset, numBytes); + return; } -final char[] chars = new char[numBytes / 2]; -Platform.copyMemory( -heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); +Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, numBytes); readerIndex = readerIdx + numBytes; -return chars; } - public long[] readLongsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + public char[] readCharsWithAlignedSize() { +final int numBytes = readPositiveAlignedVarInt(); +return readChars(numBytes); + } + + public long[] readLongs(int numBytes) { int readerIdx = readerIndex; +int numElements = numBytes >> 3; +final long[] longs = new long[numElements]; // use subtract to avoid overflow if (readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + streamReader.readToUnsafe(longs, 0, numElements); + return longs; } -final long[] longs = new long[numBytes / 8]; Platform.copyMemory( heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return longs; } - public void readChars(char[] chars, int offset, int numBytes) { -final int readerIdx = readerIndex; -// use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + /** + * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object and pointer. NOTE: This + * is a unsafe method, no check here, please be carefully. + */ + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +int remaining = size - readerIndex; +if (numBytes > remaining) { + streamReader.readToUnsafe(target, targetPointer, num
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554991906 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() { return arr; } - public byte[] readBytesAlignedSizeEmbedded() { + public byte[] readBytesWithAlignedSize() { final int numBytes = readPositiveAlignedVarInt(); int readerIdx = readerIndex; +final byte[] arr = new byte[numBytes]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIndex(%d) + length(%d) exceeds size(%d): %s", - readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readTo(arr, 0, numBytes); + return arr; } -final byte[] arr = new byte[numBytes]; Platform.UNSAFE.copyMemory( this.heapMemory, this.address + readerIdx, arr, Platform.BYTE_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return arr; } - /** - * This method should be used to read data written by {@link - * #writePrimitiveArrayWithSizeEmbedded}. - */ - public char[] readCharsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + /** This method should be used to read data written by {@link #writePrimitiveArrayWithSize}. */ + public char[] readChars(int numBytes) { int readerIdx = readerIndex; +final char[] chars = new char[numBytes >> 1]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, 0, numBytes); + return chars; } -final char[] chars = new char[numBytes / 2]; Platform.copyMemory( heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return chars; } - public char[] readCharsAlignedSizeEmbedded() { -final int numBytes = readPositiveAlignedVarInt(); + public void readChars(char[] chars, int offset, int numBytes) { final int readerIdx = readerIndex; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, offset, numBytes); + return; } -final char[] chars = new char[numBytes / 2]; -Platform.copyMemory( -heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); +Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, numBytes); readerIndex = readerIdx + numBytes; -return chars; } - public long[] readLongsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + public char[] readCharsWithAlignedSize() { +final int numBytes = readPositiveAlignedVarInt(); +return readChars(numBytes); + } + + public long[] readLongs(int numBytes) { int readerIdx = readerIndex; +int numElements = numBytes >> 3; +final long[] longs = new long[numElements]; // use subtract to avoid overflow if (readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + streamReader.readToUnsafe(longs, 0, numElements); + return longs; } -final long[] longs = new long[numBytes / 8]; Platform.copyMemory( heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return longs; } - public void readChars(char[] chars, int offset, int numBytes) { -final int readerIdx = readerIndex; -// use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + /** + * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object and pointer. NOTE: This + * is a unsafe method, no check here, please be carefully. + */ + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +int remaining = size - readerIndex; +if (numBytes > remaining) { + streamReader.readToUnsafe(target, targetPointer, num
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554981574 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() { return arr; } - public byte[] readBytesAlignedSizeEmbedded() { + public byte[] readBytesWithAlignedSize() { final int numBytes = readPositiveAlignedVarInt(); int readerIdx = readerIndex; +final byte[] arr = new byte[numBytes]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIndex(%d) + length(%d) exceeds size(%d): %s", - readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readTo(arr, 0, numBytes); + return arr; } -final byte[] arr = new byte[numBytes]; Platform.UNSAFE.copyMemory( this.heapMemory, this.address + readerIdx, arr, Platform.BYTE_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return arr; } - /** - * This method should be used to read data written by {@link - * #writePrimitiveArrayWithSizeEmbedded}. - */ - public char[] readCharsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + /** This method should be used to read data written by {@link #writePrimitiveArrayWithSize}. */ + public char[] readChars(int numBytes) { int readerIdx = readerIndex; +final char[] chars = new char[numBytes >> 1]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, 0, numBytes); + return chars; } -final char[] chars = new char[numBytes / 2]; Platform.copyMemory( heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return chars; } - public char[] readCharsAlignedSizeEmbedded() { -final int numBytes = readPositiveAlignedVarInt(); + public void readChars(char[] chars, int offset, int numBytes) { final int readerIdx = readerIndex; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, offset, numBytes); + return; } -final char[] chars = new char[numBytes / 2]; -Platform.copyMemory( -heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); +Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, numBytes); readerIndex = readerIdx + numBytes; -return chars; } - public long[] readLongsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + public char[] readCharsWithAlignedSize() { +final int numBytes = readPositiveAlignedVarInt(); +return readChars(numBytes); + } + + public long[] readLongs(int numBytes) { int readerIdx = readerIndex; +int numElements = numBytes >> 3; +final long[] longs = new long[numElements]; // use subtract to avoid overflow if (readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + streamReader.readToUnsafe(longs, 0, numElements); + return longs; } -final long[] longs = new long[numBytes / 8]; Platform.copyMemory( heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return longs; } - public void readChars(char[] chars, int offset, int numBytes) { -final int readerIdx = readerIndex; -// use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + /** + * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object and pointer. NOTE: This + * is a unsafe method, no check here, please be carefully. + */ + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +int remaining = size - readerIndex; +if (numBytes > remaining) { + streamReader.readToUnsafe(target, targetPointer, num
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554977142 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -2358,94 +2540,93 @@ public byte[] readBytesWithSizeEmbedded() { return arr; } - public byte[] readBytesAlignedSizeEmbedded() { + public byte[] readBytesWithAlignedSize() { final int numBytes = readPositiveAlignedVarInt(); int readerIdx = readerIndex; +final byte[] arr = new byte[numBytes]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIndex(%d) + length(%d) exceeds size(%d): %s", - readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readTo(arr, 0, numBytes); + return arr; } -final byte[] arr = new byte[numBytes]; Platform.UNSAFE.copyMemory( this.heapMemory, this.address + readerIdx, arr, Platform.BYTE_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return arr; } - /** - * This method should be used to read data written by {@link - * #writePrimitiveArrayWithSizeEmbedded}. - */ - public char[] readCharsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + /** This method should be used to read data written by {@link #writePrimitiveArrayWithSize}. */ + public char[] readChars(int numBytes) { int readerIdx = readerIndex; +final char[] chars = new char[numBytes >> 1]; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, 0, numBytes); + return chars; } -final char[] chars = new char[numBytes / 2]; Platform.copyMemory( heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return chars; } - public char[] readCharsAlignedSizeEmbedded() { -final int numBytes = readPositiveAlignedVarInt(); + public void readChars(char[] chars, int offset, int numBytes) { final int readerIdx = readerIndex; // use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); +if (readerIdx > size - numBytes) { + streamReader.readToUnsafe(chars, offset, numBytes); + return; } -final char[] chars = new char[numBytes / 2]; -Platform.copyMemory( -heapMemory, address + readerIdx, chars, Platform.CHAR_ARRAY_OFFSET, numBytes); +Platform.copyMemory(heapMemory, address + readerIdx, chars, offset, numBytes); readerIndex = readerIdx + numBytes; -return chars; } - public long[] readLongsWithSizeEmbedded() { -final int numBytes = readPositiveVarInt(); + public char[] readCharsWithAlignedSize() { +final int numBytes = readPositiveAlignedVarInt(); +return readChars(numBytes); + } + + public long[] readLongs(int numBytes) { int readerIdx = readerIndex; +int numElements = numBytes >> 3; +final long[] longs = new long[numElements]; // use subtract to avoid overflow if (readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + streamReader.readToUnsafe(longs, 0, numElements); + return longs; } -final long[] longs = new long[numBytes / 8]; Platform.copyMemory( heapMemory, address + readerIdx, longs, Platform.LONG_ARRAY_OFFSET, numBytes); readerIndex = readerIdx + numBytes; return longs; } - public void readChars(char[] chars, int offset, int numBytes) { -final int readerIdx = readerIndex; -// use subtract to avoid overflow -if (BoundsChecking.BOUNDS_CHECKING_ENABLED && readerIdx > size - numBytes) { - throw new IndexOutOfBoundsException( - String.format( - "readerIdx(%d) + length(%d) exceeds size(%d): %s", readerIdx, numBytes, size, this)); + /** + * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object and pointer. NOTE: This + * is a unsafe method, no check here, please be carefully. + */ + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +int remaining = size - readerIndex; +if (numBytes > remaining) { + streamReader.readToUnsafe(target, targetPointer, numBytes
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2041437927 Hi @Munoon @LiangliangSui @theweipeng @PragmaTwice , this pr is ready for review, please take a look at it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1554889981 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -90,6 +92,7 @@ public final class MemoryBuffer { private int size; Review Comment: Change this make this PR a little messy, how about renaming it in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2041128594 New readBytesString are too large for inline: ![image](https://github.com/apache/incubator-fury/assets/12445254/e76ccf29-a30f-4eab-8b16-c817d553dd05) Old version: ![image](https://github.com/apache/incubator-fury/assets/12445254/458406d6-7a14-4de1-bd7c-fd698865ba67) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2041104708 Method spilit threshold: - 15 ``` Benchmark (bufferType) (objectType) (references) Mode CntScoreError Units UserTypeDeserializeSuite.fury_deserialize arraySTRUCT false thrpt 10 4333464.647 ± 247025.147 ops/s UserTypeDeserializeSuite.fury_deserialize arraySTRUCT true thrpt 10 3643508.209 ± 470967.165 ops/s UserTypeDeserializeSuite.fury_deserialize directBufferSTRUCT false thrpt 10 3907185.065 ± 267600.435 ops/s UserTypeDeserializeSuite.fury_deserialize directBufferSTRUCT true thrpt 10 3961404.504 ± 415918.311 ops/s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2037714411 > This PR introduce 25% performance degeneration, we can''t merge it I managed to make it faster than version without streaming reading. I'll split those optimization into other PR, then we can merge this PR later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2036205425 This PR introduce 25% performance degeneration, we can''t merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550850495 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -90,6 +92,7 @@ public final class MemoryBuffer { private int size; Review Comment: Make sense, I''ll update it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550233226 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -90,6 +92,7 @@ public final class MemoryBuffer { private int size; Review Comment: Maybe `limit`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550230543 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -90,6 +92,7 @@ public final class MemoryBuffer { private int size; Review Comment: Do you have any suggestions? I think size is still fine, bytearrayOutputStream use count, ByteBuffer use capacity -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on code in PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#discussion_r1550066465 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -90,6 +92,7 @@ public final class MemoryBuffer { private int size; Review Comment: Since the `size` field did not represent actual buffer _size_, maybe it worth renaming it? WDYT @chaokunyang? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034915355 > > Hi @Munoon, I implemented streaming deserialization for input stream. Maybe it can be taken as an example for java.nio.Channel. The implementation is a littl tricky, maybe there is a better way. > > Hi, @chaokunyang I feel like I'm ready with my PR, but I'm not sure how to send it, since it is built on top of your PR. I think that I will just wait until this PR will be merged. Let me know if you have a better idea. > > Also, some methods of my implementation are fully copied from your `FuryInputStream` and some are partially. I think those can be extracted in the abstract class, but I'll leave it as is to make a PR simpler. Maybe it worth making a refactoring in a separate PR. Yep, we can wait this PR merged, than you can filing a new PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034897915 I think it's important to our users, so I plan to include this PR in our 0.5.0 release. The original streaming is error-prone when using, and not a real streaming implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034895369 Hi @Munoon @LiangliangSui @knutwannheden @PragmaTwice This PR is ready to review, could you help review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034892088 > ```java > public void increaseReaderIndex(int diff) { > int readerIdx = readerIndex + diff; > if (readerIdx < 0) { > throw new IndexOutOfBoundsException( > String.format( > "readerIndex: %d (expected: 0 <= readerIndex <= size(%d))", readerIdx, size)); > } else if (readerIdx > size) { > int readCount = streamReader.fillBuffer(diff); > if (diff > readCount) { > throw new IndexOutOfBoundsException( > String.format( > "readerIndex: %d (expected: 0 <= readerIndex <= size(%d))", readerIdx, size)); > } > } > this.readerIndex = readerIdx; > } > ``` Nice catch, I fixed it, but fill buffer by `streamReader.fillBuffer(readerIdx - size);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034646795 Oh, and by the way, I've also received an exception when testing my channel in `MemoryBuffer#increaseReaderIndex` method. I guess, current implementation in this PR is incorrect, since the `MemoryBuffer#size` property didn't represent actual size of this buffer no more. I've fixed this method as follow: ```java public void increaseReaderIndex(int diff) { int readerIdx = readerIndex + diff; if (readerIdx < 0) { throw new IndexOutOfBoundsException( String.format( "readerIndex: %d (expected: 0 <= readerIndex <= size(%d))", readerIdx, size)); } else if (readerIdx > size) { int readCount = streamReader.fillBuffer(diff); if (diff > readCount) { throw new IndexOutOfBoundsException( String.format( "readerIndex: %d (expected: 0 <= readerIndex <= size(%d))", readerIdx, size)); } } this.readerIndex = readerIdx; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
Munoon commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034623840 > Hi @Munoon, I implemented streaming deserialization for input stream. Maybe it can be taken as an example for java.nio.Channel. The implementation is a littl tricky, maybe there is a better way. Hi, @chaokunyang I feel like I'm ready with my PR, but I'm not sure how to send it, since it is built on top of your PR. I think that I will just wait until this PR will be merged. Let me know if you have a better idea. Also, some methods of my implementation are fully copied from your `FuryInputStream` and some are partially. I think those can be extracted in the abstract class, but I'll leave it as is to make a PR simpler. Maybe it worth making a refactoring in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): native streaming mode deserialization [incubator-fury]
chaokunyang commented on PR #1451: URL: https://github.com/apache/incubator-fury/pull/1451#issuecomment-2034334074 Hi @Munoon, I implemented streaming deserialization for input stream. Maybe it can be taken as an example for java.nio.Channel. The implementation is a littl tricky, maybe there is a better way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org