This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fury.git


The following commit(s) were added to refs/heads/main by this push:
     new 77896a04 feat(java): add protobuf serializer for message and byte 
string (#2213)
77896a04 is described below

commit 77896a044f3dcb9c5842174a5c23e6c7016979c3
Author: Shawn Yang <[email protected]>
AuthorDate: Sat May 10 00:44:39 2025 +0800

    feat(java): add protobuf serializer for message and byte string (#2213)
    
    ## What does this PR do?
    
    add protobuf serializer for message and byte string
    
    ## Related issues
    
    #1945
    
    ## Does this PR introduce any user-facing change?
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/fury/issues/new/choose) describing the
    need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    ## Benchmark
    
    <!--
    When the PR has an impact on performance (if you don't know whether the
    PR will have an impact on performance, you can submit the PR first, and
    if it will have impact on performance, the code reviewer will explain
    it), be sure to attach a benchmark data here.
    -->
---
 java/benchmark/pom.xml                             |   5 +
 .../fury/benchmark/state/ProtoBuffersState.java    |   6 +-
 .../benchmark/state/ProtoBuffersStateTest.java     |   4 +-
 ...sStateTest.java => ProtobufSerializerTest.java} |  36 +++---
 .../org/apache/fury/resolver/ClassResolver.java    |   5 +
 .../fury/serializer/shim/ProtobufDispatcher.java   |  77 +++++++++++++
 .../fury-core/native-image.properties              |   1 +
 java/fury-extensions/pom.xml                       |   6 +
 .../fury/serializer/ByteStringSerializer.java      |  61 +++++++++++
 .../apache/fury/serializer/ProtobufSerializer.java | 121 +++++++++++++++++++++
 10 files changed, 299 insertions(+), 23 deletions(-)

diff --git a/java/benchmark/pom.xml b/java/benchmark/pom.xml
index 9a83eceb..a5eca611 100644
--- a/java/benchmark/pom.xml
+++ b/java/benchmark/pom.xml
@@ -65,6 +65,11 @@
       <artifactId>fury-test-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.fury</groupId>
+      <artifactId>fury-extensions</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
diff --git 
a/java/benchmark/src/main/java/org/apache/fury/benchmark/state/ProtoBuffersState.java
 
b/java/benchmark/src/main/java/org/apache/fury/benchmark/state/ProtoBuffersState.java
index 2ee83606..52d91aa0 100644
--- 
a/java/benchmark/src/main/java/org/apache/fury/benchmark/state/ProtoBuffersState.java
+++ 
b/java/benchmark/src/main/java/org/apache/fury/benchmark/state/ProtoBuffersState.java
@@ -180,6 +180,10 @@ public class ProtoBuffersState {
   }
 
   public static byte[] serializeSample(Sample sample) {
+    return buildSample(sample).toByteArray();
+  }
+
+  public static ProtoMessage.Sample buildSample(Sample sample) {
     ProtoMessage.Sample.Builder builder = ProtoMessage.Sample.newBuilder();
     builder.setIntValue(sample.intValue);
     builder.setLongValue(sample.longValue);
@@ -219,7 +223,7 @@ public class ProtoBuffersState {
       builder.addBooleanArray(sample.booleanArray[i]);
     }
     builder.setString(sample.string);
-    return builder.build().toByteArray();
+    return builder.build();
   }
 
   public static Sample deserializeSample(byte[] bytes) {
diff --git 
a/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
 
b/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
index f3998cd5..34fc312d 100644
--- 
a/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
+++ 
b/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
@@ -27,7 +27,7 @@ import org.testng.annotations.Test;
 
 public class ProtoBuffersStateTest {
   @Test
-  public void testMediaContent() {
+  public void testSample() {
     Sample object = new Sample().populate(false);
     byte[] data = ProtoBuffersState.serializeSample(object);
     Sample sample = ProtoBuffersState.deserializeSample(data);
@@ -35,7 +35,7 @@ public class ProtoBuffersStateTest {
   }
 
   @Test
-  public void testSample() {
+  public void testMediaContent() {
     MediaContent object = new MediaContent().populate(false);
     byte[] data = ProtoBuffersState.serializeMediaContent(object);
     MediaContent mediaContent = 
ProtoBuffersState.deserializeMediaContent(data);
diff --git 
a/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
 
b/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtobufSerializerTest.java
similarity index 55%
copy from 
java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
copy to 
java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtobufSerializerTest.java
index f3998cd5..10ea4b4c 100644
--- 
a/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtoBuffersStateTest.java
+++ 
b/java/benchmark/src/test/java/org/apache/fury/benchmark/state/ProtobufSerializerTest.java
@@ -19,34 +19,30 @@
 
 package org.apache.fury.benchmark.state;
 
-import org.apache.fury.benchmark.data.MediaContent;
+import com.google.protobuf.ByteString;
+import org.apache.fury.Fury;
 import org.apache.fury.benchmark.data.Sample;
-import 
org.apache.fury.benchmark.state.ProtoBuffersState.ProtoBuffersUserTypeState;
+import org.apache.fury.integration_tests.state.generated.ProtoMessage;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class ProtoBuffersStateTest {
-  @Test
-  public void testMediaContent() {
-    Sample object = new Sample().populate(false);
-    byte[] data = ProtoBuffersState.serializeSample(object);
-    Sample sample = ProtoBuffersState.deserializeSample(data);
-    Assert.assertEquals(sample, object);
-  }
-
+public class ProtobufSerializerTest {
   @Test
   public void testSample() {
-    MediaContent object = new MediaContent().populate(false);
-    byte[] data = ProtoBuffersState.serializeMediaContent(object);
-    MediaContent mediaContent = 
ProtoBuffersState.deserializeMediaContent(data);
-    Assert.assertEquals(mediaContent, object);
+    Sample object = new Sample().populate(false);
+    ProtoMessage.Sample samplePb = ProtoBuffersState.buildSample(object);
+    Fury fury = Fury.builder().requireClassRegistration(false).build();
+    fury.register(ProtoMessage.Sample.class);
+    byte[] bytes = fury.serialize(samplePb);
+    Object newObj = fury.deserialize(bytes);
+    Assert.assertEquals(newObj, samplePb);
   }
 
   @Test
-  public void testProtoBuffersUserTypeState() {
-    ProtoBuffersUserTypeState state = new ProtoBuffersUserTypeState();
-    state.objectType = ObjectType.SAMPLE;
-    state.bufferType = BufferType.array;
-    state.setup();
+  public void testByteString() {
+    Fury fury = Fury.builder().requireClassRegistration(false).build();
+    Assert.assertEquals(fury.deserialize(fury.serialize(ByteString.empty())), 
ByteString.empty());
+    ByteString bytes = ByteString.copyFrom(new byte[] {1, 2, 3});
+    Assert.assertEquals(fury.deserialize(fury.serialize(bytes)), bytes);
   }
 }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java 
b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
index 5f631b7b..8b38aec5 100644
--- a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
+++ b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
@@ -153,6 +153,7 @@ import 
org.apache.fury.serializer.collection.UnmodifiableSerializers;
 import org.apache.fury.serializer.scala.SingletonCollectionSerializer;
 import org.apache.fury.serializer.scala.SingletonMapSerializer;
 import org.apache.fury.serializer.scala.SingletonObjectSerializer;
+import org.apache.fury.serializer.shim.ProtobufDispatcher;
 import org.apache.fury.serializer.shim.ShimDispatcher;
 import org.apache.fury.type.Descriptor;
 import org.apache.fury.type.DescriptorGrouper;
@@ -961,6 +962,10 @@ public class ClassResolver implements TypeResolver {
       if (shimDispatcher.contains(cls)) {
         return shimDispatcher.getSerializer(cls).getClass();
       }
+      serializerClass = ProtobufDispatcher.getSerializerClass(cls);
+      if (serializerClass != null) {
+        return serializerClass;
+      }
       if (fury.getConfig().checkJdkClassSerializable()) {
         if (cls.getName().startsWith("java") && 
!(Serializable.class.isAssignableFrom(cls))) {
           throw new UnsupportedOperationException(
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/shim/ProtobufDispatcher.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/shim/ProtobufDispatcher.java
new file mode 100644
index 00000000..04bfb8e5
--- /dev/null
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/shim/ProtobufDispatcher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.serializer.shim;
+
+import org.apache.fury.logging.Logger;
+import org.apache.fury.logging.LoggerFactory;
+import org.apache.fury.reflect.ReflectionUtils;
+import org.apache.fury.serializer.Serializer;
+import org.apache.fury.util.ExceptionUtils;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class ProtobufDispatcher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ProtobufDispatcher.class);
+
+  private static Class<?> pbByteStringClass;
+  private static Class<? extends Serializer> pbByteStringSerializerClass;
+  private static Class<?> pbMessageClass;
+  private static Class<? extends Serializer> pbMessageSerializerClass;
+
+  static {
+    try {
+      pbMessageClass = 
ReflectionUtils.loadClass("com.google.protobuf.Message");
+      pbByteStringClass = 
ReflectionUtils.loadClass("com.google.protobuf.ByteString");
+    } catch (Exception e) {
+      ExceptionUtils.ignore(e);
+    }
+    try {
+      pbMessageSerializerClass =
+          (Class<? extends Serializer>)
+              ReflectionUtils.loadClass(
+                  Serializer.class.getPackage().getName() + "." + 
"ProtobufSerializer");
+      pbByteStringSerializerClass =
+          (Class<? extends Serializer>)
+              ReflectionUtils.loadClass(
+                  Serializer.class.getPackage().getName() + "." + 
"ByteStringSerializer");
+    } catch (Exception e) {
+      ExceptionUtils.ignore(e);
+      if (pbMessageClass != null) {
+        LOG.warn("ProtobufSerializer not loaded, please add fury-extensions 
dependency.");
+      }
+    }
+  }
+
+  public static Class<? extends Serializer> getSerializerClass(Class<?> type) {
+    if (pbMessageClass == null) {
+      return null;
+    }
+    if (pbMessageSerializerClass == null) {
+      LOG.warn("ProtobufSerializer not loaded, please add fury-extensions 
dependency.");
+      return null;
+    }
+    if (pbMessageClass.isAssignableFrom(type)) {
+      return pbMessageSerializerClass;
+    }
+    if (pbByteStringClass.isAssignableFrom(type)) {
+      return pbByteStringSerializerClass;
+    }
+    return null;
+  }
+}
diff --git 
a/java/fury-core/src/main/resources/META-INF/native-image/org.apache.fury/fury-core/native-image.properties
 
b/java/fury-core/src/main/resources/META-INF/native-image/org.apache.fury/fury-core/native-image.properties
index 01170fe8..654ea39d 100644
--- 
a/java/fury-core/src/main/resources/META-INF/native-image/org.apache.fury/fury-core/native-image.properties
+++ 
b/java/fury-core/src/main/resources/META-INF/native-image/org.apache.fury/fury-core/native-image.properties
@@ -427,6 +427,7 @@ 
Args=--initialize-at-build-time=org.apache.fury.memory.MemoryBuffer,\
     org.apache.fury.serializer.LazySerializer,\
     org.apache.fury.serializer.LazySerializer$LazyObjectSerializer,\
     org.apache.fury.serializer.shim.ShimDispatcher,\
+    org.apache.fury.serializer.shim.ProtobufDispatcher,\
     org.apache.fury.shaded.org.codehaus.janino.IClass$1,\
     org.apache.fury.type.Descriptor$1,\
     org.apache.fury.type.Descriptor,\
diff --git a/java/fury-extensions/pom.xml b/java/fury-extensions/pom.xml
index 44d3bce5..73b7493b 100644
--- a/java/fury-extensions/pom.xml
+++ b/java/fury-extensions/pom.xml
@@ -60,6 +60,12 @@
       <version>${zstd.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>3.25.5</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.fury</groupId>
       <artifactId>fury-core</artifactId>
diff --git 
a/java/fury-extensions/src/main/java/org/apache/fury/serializer/ByteStringSerializer.java
 
b/java/fury-extensions/src/main/java/org/apache/fury/serializer/ByteStringSerializer.java
new file mode 100644
index 00000000..81e6d130
--- /dev/null
+++ 
b/java/fury-extensions/src/main/java/org/apache/fury/serializer/ByteStringSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.serializer;
+
+import com.google.protobuf.ByteString;
+import org.apache.fury.Fury;
+import org.apache.fury.memory.MemoryBuffer;
+
+public class ByteStringSerializer extends Serializer<ByteString> {
+  public ByteStringSerializer(Fury fury, Class<ByteString> type) {
+    super(fury, type);
+  }
+
+  @Override
+  public void write(MemoryBuffer buffer, ByteString value) {
+    int size = value.size();
+    buffer.writeVarUint32(size);
+    buffer.grow(size);
+    byte[] heapMemory = buffer.getHeapMemory();
+    if (heapMemory != null) {
+      int writerIndex = buffer._unsafeHeapWriterIndex();
+      value.copyTo(heapMemory, writerIndex);
+    } else {
+      value.copyTo(buffer.sliceAsByteBuffer(buffer.writerIndex(), size));
+    }
+    buffer.increaseWriterIndex(size);
+  }
+
+  @Override
+  public ByteString read(MemoryBuffer buffer) {
+    int size = buffer.readVarUint32Small14();
+    buffer.checkReadableBytes(size);
+    byte[] heapMemory = buffer.getHeapMemory();
+    if (heapMemory != null) {
+      ByteString bytes = ByteString.copyFrom(heapMemory, 
buffer._unsafeHeapReaderIndex(), size);
+      buffer.increaseReaderIndex(size);
+      return bytes;
+    } else {
+      ByteString bytes = 
ByteString.copyFrom(buffer.sliceAsByteBuffer(buffer.readerIndex(), size));
+      buffer.increaseReaderIndex(size);
+      return bytes;
+    }
+  }
+}
diff --git 
a/java/fury-extensions/src/main/java/org/apache/fury/serializer/ProtobufSerializer.java
 
b/java/fury-extensions/src/main/java/org/apache/fury/serializer/ProtobufSerializer.java
new file mode 100644
index 00000000..4f967da9
--- /dev/null
+++ 
b/java/fury-extensions/src/main/java/org/apache/fury/serializer/ProtobufSerializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.serializer;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.ExtensionRegistryLite;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.nio.ByteBuffer;
+import org.apache.fury.Fury;
+import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.memory.Platform;
+import org.apache.fury.util.unsafe._JDKAccess;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ProtobufSerializer extends Serializer<Message> {
+  private static final int SMALL_BYTES_THRESHOLD = 32;
+
+  private static final ClassValue<MethodHandle[]> parseFromMethodCache =
+      new ClassValue<MethodHandle[]>() {
+        @Override
+        protected MethodHandle[] computeValue(Class<?> type) {
+          try {
+            MethodHandles.Lookup lookup = _JDKAccess._trustedLookup(type);
+            MethodHandle parseFrom1 =
+                lookup.findStatic(
+                    type, "parseFrom", MethodType.methodType(type, 
CodedInputStream.class));
+            MethodHandle parseFrom2 =
+                lookup.findStatic(type, "parseFrom", 
MethodType.methodType(type, ByteBuffer.class));
+            return new MethodHandle[] {parseFrom1, parseFrom2};
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+  private final MethodHandle parseFromStream;
+  private final ExtensionRegistryLite emptyRegistry;
+  private final MethodHandle parseFromByteBuffer;
+
+  public ProtobufSerializer(Fury fury, Class type) {
+    super(fury, type, true);
+    MethodHandle[] methodHandles = parseFromMethodCache.get(type);
+    this.parseFromStream = methodHandles[0];
+    this.parseFromByteBuffer = methodHandles[0];
+    emptyRegistry = ExtensionRegistryLite.getEmptyRegistry();
+  }
+
+  @Override
+  public void write(MemoryBuffer buffer, Message value) {
+    int size = value.getSerializedSize();
+    buffer.writeVarUint32(size);
+    buffer.grow(size);
+    byte[] heapMemory = buffer.getHeapMemory();
+    try {
+      if (heapMemory != null) {
+        int writerIndex = buffer._unsafeHeapWriterIndex();
+        CodedOutputStream stream = CodedOutputStream.newInstance(heapMemory, 
writerIndex, size);
+        value.writeTo(stream);
+        buffer.increaseWriterIndex(size);
+      } else {
+        if (size < SMALL_BYTES_THRESHOLD) {
+          buffer.writeBytes(value.toByteArray());
+        } else {
+          ByteBuffer buf = 
buffer.sliceAsByteBuffer(buffer._unsafeHeapWriterIndex(), size);
+          CodedOutputStream stream = CodedOutputStream.newInstance(buf);
+          value.writeTo(stream);
+          buffer.increaseWriterIndex(size);
+        }
+      }
+    } catch (IOException e) {
+      Platform.throwException(e);
+    }
+  }
+
+  @Override
+  public Message read(MemoryBuffer buffer) {
+    int size = buffer.readVarUint32Small14();
+    buffer.checkReadableBytes(size);
+    byte[] heapMemory = buffer.getHeapMemory();
+    try {
+      if (heapMemory != null) {
+        CodedInputStream stream =
+            CodedInputStream.newInstance(heapMemory, 
buffer._unsafeHeapReaderIndex(), size);
+        buffer.increaseReaderIndex(size);
+        return (Message) parseFromStream.invoke(stream);
+      } else {
+        if (size < SMALL_BYTES_THRESHOLD) {
+          byte[] bytes = buffer.readBytes(size);
+          return (Message) parseFromStream.invoke(bytes, emptyRegistry);
+        } else {
+          ByteBuffer byteBuffer = 
buffer.sliceAsByteBuffer(buffer.readerIndex(), size);
+          buffer.increaseReaderIndex(size);
+          return (Message) parseFromByteBuffer.invoke(byteBuffer);
+        }
+      }
+    } catch (Throwable e) {
+      throw new RuntimeException(e);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to