This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-fury.git


The following commit(s) were added to refs/heads/main by this push:
     new 58bdf3e6 feat(java): add blocked stream utils (#1617)
58bdf3e6 is described below

commit 58bdf3e6c5a19c3204d0f3462db3e06126347dfc
Author: Shawn Yang <shawn.ck.y...@gmail.com>
AuthorDate: Thu May 9 22:49:26 2024 +0800

    feat(java): add blocked stream utils (#1617)
    
    ## What does this PR do?
    
    Native stream is not feasible for every cases, this PR add blocked
    stream utils to adapt to streaming API.
    
    This is s serialization helper as the fallback of streaming
    serialization/deserialization in FuryInputStream/FuryReadableChannel.
    
    FuryInputStream/FuryReadableChannel will buffer and read more data,
    which makes the original passed stream when constructing FuryInputStream
    not usable. If this is not possible, use this BlockedStreamUtils instead
    for streaming serialization and deserialization.
    
    Note that this mode will disable streaming in essence. It's just a
    helper for make the usage in streaming interface more easily. The
    deserialization will read whole bytes before do the actual
    deserialization, which don't have any streaming behaviour under the
    hood.
    
    ## Related issues
    #1451
    
    
    ## Does this PR introduce any user-facing change?
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/incubator-fury/issues/new/choose)
    describing the need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    
    ## Benchmark
    
    <!--
    When the PR has an impact on performance (if you don't know whether the
    PR will have an impact on performance, you can submit the PR first, and
    if it will have impact on performance, the code reviewer will explain
    it), be sure to attach a benchmark data here.
    -->
---
 .../src/main/java/org/apache/fury/Fury.java        |  34 ++--
 .../org/apache/fury/io/BlockedStreamUtils.java     | 188 +++++++++++++++++++++
 .../java/org/apache/fury/util/ExceptionUtils.java  |  19 +++
 .../src/test/java/org/apache/fury/CyclicTest.java  |  31 ++--
 .../org/apache/fury/io/BlockedStreamUtilsTest.java |  59 +++++++
 5 files changed, 291 insertions(+), 40 deletions(-)

diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java 
b/java/fury-core/src/main/java/org/apache/fury/Fury.java
index b03527de..ee3ed4c8 100644
--- a/java/fury-core/src/main/java/org/apache/fury/Fury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java
@@ -24,26 +24,22 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.fury.builder.JITContext;
-import org.apache.fury.collection.ObjectArray;
 import org.apache.fury.config.CompatibleMode;
 import org.apache.fury.config.Config;
 import org.apache.fury.config.FuryBuilder;
 import org.apache.fury.config.Language;
 import org.apache.fury.config.LongEncoding;
-import org.apache.fury.exception.DeserializationException;
 import org.apache.fury.io.FuryInputStream;
 import org.apache.fury.io.FuryReadableChannel;
 import org.apache.fury.logging.Logger;
 import org.apache.fury.logging.LoggerFactory;
 import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.memory.MemoryUtils;
-import org.apache.fury.memory.Platform;
 import org.apache.fury.resolver.ClassInfo;
 import org.apache.fury.resolver.ClassInfoHolder;
 import org.apache.fury.resolver.ClassResolver;
@@ -286,7 +282,7 @@ public final class Fury implements BaseFury {
     throw e;
   }
 
-  private MemoryBuffer getBuffer() {
+  public MemoryBuffer getBuffer() {
     MemoryBuffer buf = buffer;
     if (buf == null) {
       buf = buffer = MemoryBuffer.newHeapBuffer(64);
@@ -294,7 +290,7 @@ public final class Fury implements BaseFury {
     return buf;
   }
 
-  private void resetBuffer() {
+  public void resetBuffer() {
     MemoryBuffer buf = buffer;
     if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
       buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
@@ -759,7 +755,7 @@ public final class Fury implements BaseFury {
       }
       return obj;
     } catch (Throwable t) {
-      throw handleReadFailed(t);
+      throw ExceptionUtils.handleReadFailed(this, t);
     } finally {
       resetRead();
       jitContext.unlock();
@@ -792,18 +788,6 @@ public final class Fury implements BaseFury {
     return deserialize(buf, outOfBandBuffers);
   }
 
-  private RuntimeException handleReadFailed(Throwable t) {
-    if (refResolver instanceof MapRefResolver) {
-      ObjectArray readObjects = ((MapRefResolver) 
refResolver).getReadObjects();
-      // carry with read objects for better trouble shooting.
-      List<Object> objects = Arrays.asList(readObjects.objects).subList(0, 
readObjects.size);
-      throw new DeserializationException(objects, t);
-    } else {
-      Platform.throwException(t);
-      throw new IllegalStateException("unreachable");
-    }
-  }
-
   private Object xdeserializeInternal(MemoryBuffer buffer) {
     Object obj;
     int nativeObjectsStartOffset = buffer.readInt32();
@@ -1092,7 +1076,7 @@ public final class Fury implements BaseFury {
         return null;
       }
     } catch (Throwable t) {
-      throw handleReadFailed(t);
+      throw ExceptionUtils.handleReadFailed(this, t);
     } finally {
       resetRead();
       jitContext.unlock();
@@ -1102,6 +1086,10 @@ public final class Fury implements BaseFury {
   /**
    * Deserialize java object from binary by passing class info, serialization 
should use {@link
    * #serializeJavaObject}.
+   *
+   * <p>Note that {@link FuryInputStream} will buffer and read more data, do 
not use the original
+   * passed stream when constructing {@link FuryInputStream}. If this is not 
possible, use {@link
+   * org.apache.fury.io.BlockedStreamUtils} instead for streaming 
serialization and deserialization.
    */
   @Override
   public <T> T deserializeJavaObject(FuryInputStream inputStream, Class<T> 
cls) {
@@ -1116,6 +1104,10 @@ public final class Fury implements BaseFury {
   /**
    * Deserialize java object from binary channel by passing class info, 
serialization should use
    * {@link #serializeJavaObject}.
+   *
+   * <p>Note that {@link FuryInputStream} will buffer and read more data, do 
not use the original
+   * passed stream when constructing {@link FuryInputStream}. If this is not 
possible, use {@link
+   * org.apache.fury.io.BlockedStreamUtils} instead for streaming 
serialization and deserialization.
    */
   @Override
   public <T> T deserializeJavaObject(FuryReadableChannel channel, Class<T> 
cls) {
@@ -1191,7 +1183,7 @@ public final class Fury implements BaseFury {
       }
       return readRef(buffer);
     } catch (Throwable t) {
-      throw handleReadFailed(t);
+      throw ExceptionUtils.handleReadFailed(this, t);
     } finally {
       resetRead();
       jitContext.unlock();
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/io/BlockedStreamUtils.java 
b/java/fury-core/src/main/java/org/apache/fury/io/BlockedStreamUtils.java
new file mode 100644
index 00000000..961b65f0
--- /dev/null
+++ b/java/fury-core/src/main/java/org/apache/fury/io/BlockedStreamUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.fury.Fury;
+import org.apache.fury.exception.DeserializationException;
+import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.serializer.BufferCallback;
+import org.apache.fury.util.ExceptionUtils;
+import org.apache.fury.util.Preconditions;
+
+/**
+ * A serialization helper as the fallback of streaming 
serialization/deserialization in {@link
+ * FuryInputStream}/{@link FuryReadableChannel}. {@link 
FuryInputStream}/{@link FuryReadableChannel}
+ * will buffer and read more data, which makes the original passed stream when 
constructing {@link
+ * FuryInputStream} not usable. If this is not possible, use this {@link 
BlockedStreamUtils} instead
+ * for streaming serialization and deserialization.
+ *
+ * <p>Note that this mode will disable streaming in essence. It's just a 
helper for make the usage
+ * in streaming interface more easily. The deserialization will read whole 
bytes before do the
+ * actual deserialization, which don't have any streaming behaviour under the 
hood.
+ */
+public class BlockedStreamUtils {
+  public static void serialize(Fury fury, OutputStream outputStream, Object 
obj) {
+    serializeToStream(fury, outputStream, buf -> fury.serialize(buf, obj, 
null));
+  }
+
+  public static void serialize(
+      Fury fury, OutputStream outputStream, Object obj, BufferCallback 
callback) {
+    serializeToStream(fury, outputStream, buf -> fury.serialize(buf, obj, 
callback));
+  }
+
+  /**
+   * Serialize java object without class info, deserialization should use 
{@link
+   * #deserializeJavaObject}.
+   */
+  public static void serializeJavaObject(Fury fury, OutputStream outputStream, 
Object obj) {
+    serializeToStream(fury, outputStream, buf -> fury.serializeJavaObject(buf, 
obj));
+  }
+
+  public static Object deserialize(Fury fury, InputStream inputStream) {
+    return deserialize(fury, inputStream, null);
+  }
+
+  public static Object deserialize(
+      Fury fury, InputStream inputStream, Iterable<MemoryBuffer> 
outOfBandBuffers) {
+    return deserializeFromStream(fury, inputStream, buf -> 
fury.deserialize(buf, outOfBandBuffers));
+  }
+
+  public static Object deserialize(Fury fury, ReadableByteChannel channel) {
+    return readFromChannel(fury, channel, b -> fury.deserialize(b, null));
+  }
+
+  public static Object deserialize(
+      Fury fury, ReadableByteChannel channel, Iterable<MemoryBuffer> 
outOfBandBuffers) {
+    return readFromChannel(fury, channel, b -> fury.deserialize(b, 
outOfBandBuffers));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T deserializeJavaObject(Fury fury, InputStream 
inputStream, Class<T> type) {
+    return (T)
+        deserializeFromStream(fury, inputStream, buf -> 
fury.deserializeJavaObject(buf, type));
+  }
+
+  public static Object deserializeJavaObject(
+      Fury fury, ReadableByteChannel channel, Class<?> type) {
+    return readFromChannel(fury, channel, b -> fury.deserializeJavaObject(b, 
type));
+  }
+
+  private static Object readFromChannel(
+      Fury fury, ReadableByteChannel channel, Function<MemoryBuffer, Object> 
action) {
+    try {
+      MemoryBuffer buf = fury.getBuffer();
+      ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+      byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+      readByteBuffer(channel, byteBuffer, 4);
+      int size = byteBuffer.getInt();
+      buf.ensure(size);
+      readByteBuffer(channel, buf.sliceAsByteBuffer(), size);
+      return action.apply(buf);
+    } finally {
+      fury.resetBuffer();
+    }
+  }
+
+  private static void readByteBuffer(ReadableByteChannel channel, ByteBuffer 
buffer, int size) {
+    int read;
+    buffer.limit(buffer.position() + size);
+    try {
+      read = channel.read(buffer);
+      while (read < size) {
+        int len = channel.read(buffer);
+        if (len == -1) {
+          throw new DeserializationException(
+              String.format("Channel only have %s, but need %s", read, size));
+        }
+        read += channel.read(buffer);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    buffer.rewind();
+  }
+
+  private static void serializeToStream(
+      Fury fury, OutputStream outputStream, Consumer<MemoryBuffer> function) {
+    MemoryBuffer buf = fury.getBuffer();
+    buf.writerIndex(0);
+    try {
+      buf.writeInt32(-1);
+      function.accept(buf);
+      buf.putInt32(0, buf.writerIndex() - 4);
+      byte[] bytes = buf.getHeapMemory();
+      if (bytes != null) {
+        outputStream.write(bytes, 0, buf.writerIndex());
+      } else {
+        outputStream.write(buf.getBytes(0, buf.writerIndex()));
+      }
+      outputStream.flush();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      fury.resetBuffer();
+    }
+  }
+
+  private static Object deserializeFromStream(
+      Fury fury, InputStream inputStream, Function<MemoryBuffer, Object> 
function) {
+    MemoryBuffer buf = fury.getBuffer();
+    try {
+      readToBufferFromStream(inputStream, buf);
+      return function.apply(buf);
+    } catch (Throwable t) {
+      throw ExceptionUtils.handleReadFailed(fury, t);
+    } finally {
+      fury.resetBuffer();
+    }
+  }
+
+  private static void readToBufferFromStream(InputStream inputStream, 
MemoryBuffer buffer)
+      throws IOException {
+    buffer.readerIndex(0);
+    int read = readBytes(inputStream, buffer.getHeapMemory(), 0, 4);
+    Preconditions.checkArgument(read == 4);
+    int size = buffer.readInt32();
+    buffer.ensure(4 + size);
+    read = readBytes(inputStream, buffer.getHeapMemory(), 4, size);
+    Preconditions.checkArgument(read == size);
+  }
+
+  private static int readBytes(InputStream inputStream, byte[] buffer, int 
offset, int size)
+      throws IOException {
+    int read = 0;
+    int count = 0;
+    while (read < size) {
+      if ((count = inputStream.read(buffer, offset + read, size - read)) == 
-1) {
+        break;
+      }
+      read += count;
+    }
+    return (read == 0 && count == -1) ? -1 : read;
+  }
+}
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java 
b/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java
index be72de10..664500d6 100644
--- a/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java
+++ b/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java
@@ -20,7 +20,14 @@
 package org.apache.fury.util;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.fury.Fury;
+import org.apache.fury.collection.ObjectArray;
+import org.apache.fury.exception.DeserializationException;
+import org.apache.fury.memory.Platform;
 import org.apache.fury.reflect.ReflectionUtils;
+import org.apache.fury.resolver.MapRefResolver;
 
 /** Util for java exceptions. */
 public class ExceptionUtils {
@@ -48,5 +55,17 @@ public class ExceptionUtils {
     }
   }
 
+  public static RuntimeException handleReadFailed(Fury fury, Throwable t) {
+    if (fury.getRefResolver() instanceof MapRefResolver) {
+      ObjectArray readObjects = ((MapRefResolver) 
fury.getRefResolver()).getReadObjects();
+      // carry with read objects for better trouble shooting.
+      List<Object> objects = Arrays.asList(readObjects.objects).subList(0, 
readObjects.size);
+      throw new DeserializationException(objects, t);
+    } else {
+      Platform.throwException(t);
+      throw new IllegalStateException("unreachable");
+    }
+  }
+
   public static void ignore(Object... args) {}
 }
diff --git a/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java 
b/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java
index ac3a4049..58ca1778 100644
--- a/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java
+++ b/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java
@@ -21,8 +21,12 @@ package org.apache.fury;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.zip.GZIPOutputStream;
 import org.apache.fury.config.CompatibleMode;
 import org.apache.fury.config.FuryBuilder;
 import org.apache.fury.config.Language;
@@ -86,24 +90,13 @@ public class CyclicTest extends FuryTestBase {
     }
   }
 
-  @Test(dataProvider = "fury")
-  public void testBeanMetaShared(FuryBuilder builder) {
-    Fury fury = 
builder.withMetaContextShare(true).withRefTracking(true).build();
-    for (Object[] objects : beans()) {
-      Object notCyclic = objects[0];
-      Object cyclic = objects[1];
-      Assert.assertEquals(notCyclic, serDeMetaShared(fury, notCyclic));
-      Assert.assertEquals(cyclic, serDeMetaShared(fury, cyclic));
-      Object[] arr = new Object[2];
-      arr[0] = arr;
-      arr[1] = cyclic;
-      Assert.assertEquals(arr[1], ((Object[]) serDeMetaShared(fury, arr))[1]);
-      List<Object> list = new ArrayList<>();
-      list.add(list);
-      list.add(cyclic);
-      list.add(arr);
-      Assert.assertEquals(
-          ((Object[]) list.get(2))[1], ((Object[]) ((List) 
serDeMetaShared(fury, list)).get(2))[1]);
-    }
+  @Test
+  public void testBeanMetaShared() throws IOException {
+    ByteArrayOutputStream s = new ByteArrayOutputStream();
+    GZIPOutputStream gzipOutputStream = new GZIPOutputStream(s);
+    
gzipOutputStream.write(Fury.class.getName().getBytes(StandardCharsets.UTF_8));
+    gzipOutputStream.close();
+    System.out.println("gzip" + s.size());
+    
System.out.println(Fury.class.getName().getBytes(StandardCharsets.UTF_8).length);
   }
 }
diff --git 
a/java/fury-core/src/test/java/org/apache/fury/io/BlockedStreamUtilsTest.java 
b/java/fury-core/src/test/java/org/apache/fury/io/BlockedStreamUtilsTest.java
new file mode 100644
index 00000000..4530440c
--- /dev/null
+++ 
b/java/fury-core/src/test/java/org/apache/fury/io/BlockedStreamUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.io;
+
+import static org.testng.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import org.apache.fury.Fury;
+import org.apache.fury.FuryTestBase;
+import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.test.bean.Foo;
+import org.testng.annotations.Test;
+
+public class BlockedStreamUtilsTest extends FuryTestBase {
+
+  @Test
+  public void testDeserializeStream() {
+    Fury fury = getJavaFury();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    Foo foo = Foo.create();
+    BlockedStreamUtils.serialize(fury, stream, foo);
+    BlockedStreamUtils.serializeJavaObject(fury, stream, foo);
+    ByteArrayInputStream inputStream = new 
ByteArrayInputStream(stream.toByteArray());
+    assertEquals(BlockedStreamUtils.deserialize(fury, inputStream), foo);
+    assertEquals(BlockedStreamUtils.deserializeJavaObject(fury, inputStream, 
Foo.class), foo);
+  }
+
+  @Test
+  public void testDeserializeChannel() {
+    Fury fury = builder().withCodegen(false).build();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    Foo foo = Foo.create();
+    BlockedStreamUtils.serialize(fury, stream, foo);
+    BlockedStreamUtils.serializeJavaObject(fury, stream, foo);
+    try (MemoryBufferReadableChannel channel =
+        new 
MemoryBufferReadableChannel(MemoryBuffer.fromByteArray(stream.toByteArray()))) {
+      assertEquals(BlockedStreamUtils.deserialize(fury, channel), foo);
+      assertEquals(BlockedStreamUtils.deserializeJavaObject(fury, channel, 
Foo.class), foo);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org

Reply via email to