This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-fury.git


The following commit(s) were added to refs/heads/main by this push:
     new da5f8473 feat(java): support meta compression by Deflater (#1663)
da5f8473 is described below

commit da5f8473818bc2768b4f3b5a42c39a2c7fff8120
Author: Shawn Yang <shawn.ck.y...@gmail.com>
AuthorDate: Sat Jun 1 15:47:43 2024 +0800

    feat(java): support meta compression by Deflater (#1663)
    
    ## What does this PR do?
    
    This PR support meta compression and add Deflater as default compressor.
    
    In our test, it can compress meta by reduce size of **243** without
    introducing any performance cost:
    
    ```
    before:
    Fury | STRUCT | false | array | 1227 |
    
    after
    STRUCT | false | array | 984 |
    
    ```
    
    ## Related issues
    #1660
    
    
    ## Does this PR introduce any user-facing change?
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/incubator-fury/issues/new/choose)
    describing the need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    
    ## Benchmark
    
    <!--
    When the PR has an impact on performance (if you don't know whether the
    PR will have an impact on performance, you can submit the PR first, and
    if it will have impact on performance, the code reviewer will explain
    it), be sure to attach a benchmark data here.
    -->
---
 docs/guide/java_serialization_guide.md             |  4 +-
 .../main/java/org/apache/fury/config/Config.java   | 13 ++++
 .../java/org/apache/fury/config/FuryBuilder.java   | 13 ++++
 .../main/java/org/apache/fury/meta/ClassDef.java   |  1 +
 .../java/org/apache/fury/meta/ClassDefDecoder.java | 28 +++++---
 .../java/org/apache/fury/meta/ClassDefEncoder.java | 80 +++++++++++++---------
 .../apache/fury/meta/DeflaterMetaCompressor.java   | 72 +++++++++++++++++++
 .../java/org/apache/fury/meta/MetaCompressor.java  | 53 ++++++++++++++
 .../apache/fury/meta/TypeEqualMetaCompressor.java  | 67 ++++++++++++++++++
 .../main/java/org/apache/fury/util/MathUtils.java  |  4 ++
 .../org/apache/fury/config/FuryBuilderTest.java    | 75 ++++++++++++++++++++
 11 files changed, 368 insertions(+), 42 deletions(-)

diff --git a/docs/guide/java_serialization_guide.md 
b/docs/guide/java_serialization_guide.md
index 4869d836..2cca181a 100644
--- a/docs/guide/java_serialization_guide.md
+++ b/docs/guide/java_serialization_guide.md
@@ -108,7 +108,9 @@ public class Example {
 | `registerGuavaTypes`                | Whether to pre-register Guava types 
such as `RegularImmutableMap`/`RegularImmutableList`. These types are not 
public API, but seem pretty stable.                                             
                                                                                
                                                                                
                                                                                
                       [...]
 | `requireClassRegistration`          | Disabling may allow unknown classes to 
be deserialized, potentially causing security risks.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | `suppressClassRegistrationWarnings` | Whether to suppress class registration 
warnings. The warnings can be used for security audit, but may be annoying, 
this suppression will be enabled by default.                                    
                                                                                
                                                                                
                                                                                
                  [...]
-| `shareMetaContext`                  | Enables or disables meta share mode.   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| `metaShareEnabled`                  | Enables or disables meta share mode.   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| `scopedMetaShareEnabled`            | Scoped meta share focuses on a single 
serialization process. Metadata created or identified during this process is 
exclusive to it and is not shared with by other serializations.                 
                                                                                
                                                                                
                                                                                
                  [...]
+| `metaCompressor`                    | Set a compressor for meta compression. 
Note that the passed MetaCompressor should be thread-safe. By default, a 
`Deflater` based compressor `DeflaterMetaCompressor` will be used. Users can 
pass other compressor such as `zstd` for better compression rate.               
                                                                                
                                                                                
                        [...]
 | `deserializeNonexistentClass`       | Enables or disables 
deserialization/skipping of data for non-existent classes.                      
                                                                                
                                                                                
                                                                                
                                                                                
                                 [...]
 | `codeGenEnabled`                    | Disabling may result in faster initial 
serialization but slower subsequent serializations.                             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | `asyncCompilationEnabled`           | If enabled, serialization uses 
interpreter mode first and switches to JIT serialization after async serializer 
JIT for a class is finished.                                                    
                                                                                
                                                                                
                                                                                
                      [...]
diff --git a/java/fury-core/src/main/java/org/apache/fury/config/Config.java 
b/java/fury-core/src/main/java/org/apache/fury/config/Config.java
index 1a2c95c2..4e64e31f 100644
--- a/java/fury-core/src/main/java/org/apache/fury/config/Config.java
+++ b/java/fury-core/src/main/java/org/apache/fury/config/Config.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.fury.Fury;
+import org.apache.fury.meta.MetaCompressor;
 import org.apache.fury.serializer.Serializer;
 import org.apache.fury.serializer.TimeSerializers;
 import org.apache.fury.util.Preconditions;
@@ -51,6 +52,7 @@ public class Config implements Serializable {
   private final boolean registerGuavaTypes;
   private final boolean metaShareEnabled;
   private final boolean scopedMetaShareEnabled;
+  private final MetaCompressor metaCompressor;
   private final boolean asyncCompilationEnabled;
   private final boolean deserializeNonexistentClass;
   private final boolean scalaOptimizationEnabled;
@@ -77,6 +79,7 @@ public class Config implements Serializable {
     defaultJDKStreamSerializerType = builder.defaultJDKStreamSerializerType;
     metaShareEnabled = builder.metaShareEnabled;
     scopedMetaShareEnabled = builder.scopedMetaShareEnabled;
+    metaCompressor = builder.metaCompressor;
     deserializeNonexistentClass = builder.deserializeNonexistentClass;
     if (deserializeNonexistentClass) {
       // Only in meta share mode or compatibleMode, fury knows how to 
deserialize
@@ -192,6 +195,14 @@ public class Config implements Serializable {
     return scopedMetaShareEnabled;
   }
 
+  /**
+   * Returns a {@link MetaCompressor} to compress class metadata such as field 
names and types. The
+   * returned {@link MetaCompressor} should be thread safe.
+   */
+  public MetaCompressor getMetaCompressor() {
+    return metaCompressor;
+  }
+
   /**
    * Whether deserialize/skip data of un-existed class. If not enabled, an 
exception will be thrown
    * if class not exist.
@@ -247,6 +258,7 @@ public class Config implements Serializable {
         && registerGuavaTypes == config.registerGuavaTypes
         && metaShareEnabled == config.metaShareEnabled
         && scopedMetaShareEnabled == config.scopedMetaShareEnabled
+        && Objects.equals(metaCompressor, config.metaCompressor)
         && asyncCompilationEnabled == config.asyncCompilationEnabled
         && deserializeNonexistentClass == config.deserializeNonexistentClass
         && scalaOptimizationEnabled == config.scalaOptimizationEnabled
@@ -278,6 +290,7 @@ public class Config implements Serializable {
         registerGuavaTypes,
         metaShareEnabled,
         scopedMetaShareEnabled,
+        metaCompressor,
         asyncCompilationEnabled,
         deserializeNonexistentClass,
         scalaOptimizationEnabled);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java 
b/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java
index 419e3212..b7c93d87 100644
--- a/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java
+++ b/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java
@@ -27,6 +27,8 @@ import org.apache.fury.ThreadSafeFury;
 import org.apache.fury.logging.Logger;
 import org.apache.fury.logging.LoggerFactory;
 import org.apache.fury.memory.Platform;
+import org.apache.fury.meta.DeflaterMetaCompressor;
+import org.apache.fury.meta.MetaCompressor;
 import org.apache.fury.pool.ThreadPoolFury;
 import org.apache.fury.resolver.ClassResolver;
 import org.apache.fury.serializer.JavaSerializer;
@@ -77,6 +79,7 @@ public final class FuryBuilder {
   boolean scalaOptimizationEnabled = false;
   boolean suppressClassRegistrationWarnings = true;
   boolean deserializeNonexistentEnumValueAsNull = false;
+  MetaCompressor metaCompressor = new DeflaterMetaCompressor();
 
   public FuryBuilder() {}
 
@@ -251,6 +254,16 @@ public final class FuryBuilder {
     return this;
   }
 
+  /**
+   * Set a compressor for meta compression. Note that the passed {@link 
MetaCompressor} should be
+   * thread-safe. By default, a `Deflater` based compressor {@link 
DeflaterMetaCompressor} will be
+   * used. Users can pass other compressor such as `zstd` for better 
compression rate.
+   */
+  public FuryBuilder withMetaCompressor(MetaCompressor metaCompressor) {
+    this.metaCompressor = MetaCompressor.checkMetaCompressor(metaCompressor);
+    return this;
+  }
+
   /**
    * Whether deserialize/skip data of un-existed class.
    *
diff --git a/java/fury-core/src/main/java/org/apache/fury/meta/ClassDef.java 
b/java/fury-core/src/main/java/org/apache/fury/meta/ClassDef.java
index 899890bf..40062660 100644
--- a/java/fury-core/src/main/java/org/apache/fury/meta/ClassDef.java
+++ b/java/fury-core/src/main/java/org/apache/fury/meta/ClassDef.java
@@ -79,6 +79,7 @@ public class ClassDef implements Serializable {
   static final int SCHEMA_COMPATIBLE_FLAG = 0b10000;
   public static final int SIZE_TWO_BYTES_FLAG = 0b100000;
   static final int OBJECT_TYPE_FLAG = 0b1000000;
+  static final int COMPRESSION_FLAG = 0b10000000;
   // TODO use field offset to sort field, which will hit l1-cache more. Since
   // `objectFieldOffset` is not part of jvm-specification, it may change 
between different jdk
   // vendor. But the deserialization peer use the class definition to create 
deserializer, it's OK
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefDecoder.java 
b/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefDecoder.java
index 88629e49..154654ca 100644
--- a/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefDecoder.java
+++ b/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefDecoder.java
@@ -19,6 +19,7 @@
 
 package org.apache.fury.meta;
 
+import static org.apache.fury.meta.ClassDef.COMPRESSION_FLAG;
 import static org.apache.fury.meta.ClassDef.SIZE_TWO_BYTES_FLAG;
 import static org.apache.fury.meta.Encoders.fieldNameEncodings;
 import static org.apache.fury.meta.Encoders.pkgEncodings;
@@ -50,33 +51,42 @@ class ClassDefDecoder {
       size = buffer.readByte() & 0xff;
       encoded.writeByte(size);
     }
-    buffer.checkReadableBytes(size);
-    encoded.writeBytes(buffer.getBytes(buffer.readerIndex(), size));
+    byte[] encodedClassDef = buffer.readBytes(size);
+    encoded.writeBytes(encodedClassDef);
+    if ((id & COMPRESSION_FLAG) != 0) {
+      encodedClassDef =
+          classResolver
+              .getFury()
+              .getConfig()
+              .getMetaCompressor()
+              .decompress(encodedClassDef, 0, size);
+    }
+    MemoryBuffer classDefBuf = MemoryBuffer.fromByteArray(encodedClassDef);
     long header = id & 0xff;
     int numClasses = (int) (header & 0b1111);
     if (numClasses == 0b1111) {
-      numClasses += buffer.readVarUint32Small7();
+      numClasses += classDefBuf.readVarUint32Small7();
     }
     numClasses += 1;
-    String className = null;
+    String className;
     List<ClassDef.FieldInfo> classFields = new ArrayList<>();
     ClassSpec classSpec = null;
     for (int i = 0; i < numClasses; i++) {
       // | num fields + register flag | header + package name | header + class 
name
       // | header + type id + field name | next field info | ... |
-      int currentClassHeader = buffer.readVarUint32Small7();
+      int currentClassHeader = classDefBuf.readVarUint32Small7();
       boolean isRegistered = (currentClassHeader & 0b1) != 0;
       int numFields = currentClassHeader >>> 1;
       if (isRegistered) {
-        int registeredId = buffer.readVarUint32Small7();
+        int registeredId = classDefBuf.readVarUint32Small7();
         className = classResolver.getClassInfo((short) 
registeredId).getCls().getName();
       } else {
-        String pkg = readPkgName(buffer);
-        String typeName = readTypeName(buffer);
+        String pkg = readPkgName(classDefBuf);
+        String typeName = readTypeName(classDefBuf);
         classSpec = Encoders.decodePkgAndClass(pkg, typeName);
         className = classSpec.entireClassName;
       }
-      List<ClassDef.FieldInfo> fieldInfos = readFieldsInfo(buffer, className, 
numFields);
+      List<ClassDef.FieldInfo> fieldInfos = readFieldsInfo(classDefBuf, 
className, numFields);
       classFields.addAll(fieldInfos);
     }
     Preconditions.checkNotNull(classSpec);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefEncoder.java 
b/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefEncoder.java
index 078f85cf..46e2becf 100644
--- a/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefEncoder.java
+++ b/java/fury-core/src/main/java/org/apache/fury/meta/ClassDefEncoder.java
@@ -19,12 +19,14 @@
 
 package org.apache.fury.meta;
 
+import static org.apache.fury.meta.ClassDef.COMPRESSION_FLAG;
 import static org.apache.fury.meta.ClassDef.OBJECT_TYPE_FLAG;
 import static org.apache.fury.meta.ClassDef.SCHEMA_COMPATIBLE_FLAG;
 import static org.apache.fury.meta.ClassDef.SIZE_TWO_BYTES_FLAG;
 import static org.apache.fury.meta.Encoders.fieldNameEncodingsList;
 import static org.apache.fury.meta.Encoders.pkgEncodingsList;
 import static org.apache.fury.meta.Encoders.typeNameEncodingsList;
+import static org.apache.fury.util.MathUtils.toInt;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -124,20 +126,7 @@ class ClassDefEncoder {
       Class<?> type,
       Map<String, List<FieldInfo>> classLayers,
       boolean isObjectType) {
-    MemoryBuffer buffer = MemoryUtils.buffer(32);
-    buffer.increaseWriterIndex(9); // header + one byte size
-    long header;
-    int encodedSize = classLayers.size() - 1; // num class must be greater 
than 0
-    if (encodedSize > 0b1110) {
-      header = 0b1111;
-      buffer.writeVarUint32Small7(encodedSize - 0b1110);
-    } else {
-      header = encodedSize;
-    }
-    header |= SCHEMA_COMPATIBLE_FLAG;
-    if (isObjectType) {
-      header |= OBJECT_TYPE_FLAG;
-    }
+    MemoryBuffer classDefBuf = MemoryBuffer.newHeapBuffer(128);
     for (Map.Entry<String, List<FieldInfo>> entry : classLayers.entrySet()) {
       String className = entry.getKey();
       List<FieldInfo> fields = entry.getValue();
@@ -146,36 +135,63 @@ class ClassDefEncoder {
       int currentClassHeader = (fields.size() << 1);
       if (classResolver.isRegistered(type)) {
         currentClassHeader |= 1;
-        buffer.writeVarUint32Small7(currentClassHeader);
-        buffer.writeVarUint32Small7(classResolver.getRegisteredClassId(type));
+        classDefBuf.writeVarUint32Small7(currentClassHeader);
+        
classDefBuf.writeVarUint32Small7(classResolver.getRegisteredClassId(type));
       } else {
-        buffer.writeVarUint32Small7(currentClassHeader);
+        classDefBuf.writeVarUint32Small7(currentClassHeader);
         Class<?> currentType = getType(type, className);
         Tuple2<String, String> encoded = 
Encoders.encodePkgAndClass(currentType);
-        writePkgName(buffer, encoded.f0);
-        writeTypeName(buffer, encoded.f1);
+        writePkgName(classDefBuf, encoded.f0);
+        writeTypeName(classDefBuf, encoded.f1);
       }
-      writeFieldsInfo(buffer, fields);
+      writeFieldsInfo(classDefBuf, fields);
+    }
+    byte[] compressed =
+        classResolver
+            .getFury()
+            .getConfig()
+            .getMetaCompressor()
+            .compress(classDefBuf.getHeapMemory(), 0, 
classDefBuf.writerIndex());
+    boolean isCompressed = false;
+    if (compressed.length < classDefBuf.writerIndex()) {
+      isCompressed = true;
+      classDefBuf = MemoryBuffer.fromByteArray(compressed);
+      classDefBuf.writerIndex(compressed.length);
+    }
+    long hash =
+        MurmurHash3.murmurhash3_x64_128(
+            classDefBuf.getHeapMemory(), 0, classDefBuf.writerIndex(), 47)[0];
+    long header;
+    int numClasses = classLayers.size() - 1; // num class must be greater than 0
+    if (numClasses > 0b1110) {
+      header = 0b1111;
+    } else {
+      header = numClasses;
+    }
+    header |= SCHEMA_COMPATIBLE_FLAG;
+    if (isObjectType) {
+      header |= OBJECT_TYPE_FLAG;
+    }
+    if (isCompressed) {
+      header |= COMPRESSION_FLAG;
     }
-    byte[] encodedClassDef = buffer.getBytes(0, buffer.writerIndex());
-    long hash = MurmurHash3.murmurhash3_x64_128(encodedClassDef, 0, 
encodedClassDef.length, 47)[0];
     // this id will be part of generated codec, a negative number won't be 
allowed in class name.
     hash <<= 8;
     header |= Math.abs(hash);
-    int len = buffer.writerIndex() - 9;
+    MemoryBuffer buffer = MemoryUtils.buffer(classDefBuf.writerIndex() + 10);
+    int len = classDefBuf.writerIndex() + toInt(numClasses > 0b1110);
     if (len > 255) {
       header |= SIZE_TWO_BYTES_FLAG;
-    }
-    buffer.putInt64(0, header);
-    if (len > 255) {
-      MemoryBuffer buf = MemoryBuffer.newHeapBuffer(len + 1);
-      buf.writeInt64(header);
-      buf.writeInt16((short) len);
-      buf.writeBytes(buffer.getBytes(9, len));
-      buffer = buf;
+      buffer.writeInt64(header);
+      buffer.writeInt16((short) len);
     } else {
-      buffer.putByte(8, (byte) len);
+      buffer.writeInt64(header);
+      buffer.writeByte(len);
+    }
+    if (numClasses > 0b1110) {
+      buffer.writeVarUint32Small7(numClasses - 0b1110);
     }
+    buffer.writeBytes(classDefBuf.getHeapMemory(), 0, 
classDefBuf.writerIndex());
     return buffer;
   }
 
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/meta/DeflaterMetaCompressor.java 
b/java/fury-core/src/main/java/org/apache/fury/meta/DeflaterMetaCompressor.java
new file mode 100644
index 00000000..c875594d
--- /dev/null
+++ 
b/java/fury-core/src/main/java/org/apache/fury/meta/DeflaterMetaCompressor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.meta;
+
+import java.io.ByteArrayOutputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/** A meta compressor based on {@link Deflater} compression algorithm. */
+public class DeflaterMetaCompressor implements MetaCompressor {
+  @Override
+  public byte[] compress(byte[] input, int offset, int size) {
+    Deflater deflater = new Deflater();
+    deflater.setInput(input, offset, size);
+    deflater.finish();
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    byte[] buffer = new byte[128];
+    while (!deflater.finished()) {
+      int compressedSize = deflater.deflate(buffer);
+      outputStream.write(buffer, 0, compressedSize);
+    }
+    return outputStream.toByteArray();
+  }
+
+  @Override
+  public byte[] decompress(byte[] input, int offset, int size) {
+    Inflater inflater = new Inflater();
+    inflater.setInput(input, offset, size);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    byte[] buffer = new byte[128];
+    try {
+      while (!inflater.finished()) {
+        int decompressedSize = inflater.inflate(buffer);
+        outputStream.write(buffer, 0, decompressedSize);
+      }
+    } catch (DataFormatException e) {
+      throw new RuntimeException(e);
+    }
+    return outputStream.toByteArray();
+  }
+
+  @Override
+  public int hashCode() {
+    return DeflaterMetaCompressor.class.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    return o != null && getClass() == o.getClass();
+  }
+}
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/meta/MetaCompressor.java 
b/java/fury-core/src/main/java/org/apache/fury/meta/MetaCompressor.java
new file mode 100644
index 00000000..e2a2af19
--- /dev/null
+++ b/java/fury-core/src/main/java/org/apache/fury/meta/MetaCompressor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.meta;
+
+/**
+ * An interface used to compress class metadata such as field names and types. 
The implementation of
+ * this interface should be thread safe.
+ */
+public interface MetaCompressor {
+  byte[] compress(byte[] data, int offset, int size);
+
+  byte[] decompress(byte[] data, int offset, int size);
+
+  /**
+   * Check whether {@link MetaCompressor} implements `equals/hashCode` method. 
If not implemented,
+   * return {@link TypeEqualMetaCompressor} instead which compare equality by 
the compressor type
+   * for better serializer compile cache.
+   */
+  static MetaCompressor checkMetaCompressor(MetaCompressor compressor) {
+    Class<?> clz = compressor.getClass();
+    if (clz != DeflaterMetaCompressor.class) {
+      while (clz != null) {
+        try {
+          clz.getDeclaredMethod("hashCode");
+          if (clz == Object.class) {
+            return new TypeEqualMetaCompressor(compressor);
+          }
+          break;
+        } catch (NoSuchMethodException e) {
+          clz = clz.getSuperclass();
+        }
+      }
+    }
+    return compressor;
+  }
+}
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/meta/TypeEqualMetaCompressor.java
 
b/java/fury-core/src/main/java/org/apache/fury/meta/TypeEqualMetaCompressor.java
new file mode 100644
index 00000000..2fa9133e
--- /dev/null
+++ 
b/java/fury-core/src/main/java/org/apache/fury/meta/TypeEqualMetaCompressor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.meta;
+
+import org.apache.fury.logging.Logger;
+import org.apache.fury.logging.LoggerFactory;
+
+/**
+ * A {@link MetaCompressor} wrapper which compare equality by the compressor 
type for better
+ * serializer compile cache.
+ */
+class TypeEqualMetaCompressor implements MetaCompressor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TypeEqualMetaCompressor.class);
+
+  private final MetaCompressor compressor;
+
+  public TypeEqualMetaCompressor(MetaCompressor compressor) {
+    this.compressor = compressor;
+    LOG.warn(
+        "{} should implement equals/hashCode method, "
+            + "otherwise compile cache may won't work. "
+            + "Use type to check MetaCompressor identity instead, but this"
+            + "may be incorrect if different compressor instance of same type "
+            + "indicates different compressor.",
+        compressor);
+  }
+
+  @Override
+  public byte[] compress(byte[] data, int offset, int size) {
+    return compressor.compress(data, offset, size);
+  }
+
+  @Override
+  public byte[] decompress(byte[] data, int offset, int size) {
+    return compressor.decompress(data, offset, size);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || obj.getClass() != getClass()) {
+      return false;
+    }
+    return compressor.getClass().equals((((TypeEqualMetaCompressor) 
obj).compressor).getClass());
+  }
+
+  @Override
+  public int hashCode() {
+    return compressor.getClass().hashCode();
+  }
+}
diff --git a/java/fury-core/src/main/java/org/apache/fury/util/MathUtils.java 
b/java/fury-core/src/main/java/org/apache/fury/util/MathUtils.java
index eabeb994..8b96d170 100644
--- a/java/fury-core/src/main/java/org/apache/fury/util/MathUtils.java
+++ b/java/fury-core/src/main/java/org/apache/fury/util/MathUtils.java
@@ -43,4 +43,8 @@ public class MathUtils {
   public static long floorMod(long x, long y, long floorDiv) {
     return x - floorDiv * y;
   }
+
+  public static int toInt(boolean c) {
+    return c ? 1 : 0;
+  }
 }
diff --git 
a/java/fury-core/src/test/java/org/apache/fury/config/FuryBuilderTest.java 
b/java/fury-core/src/test/java/org/apache/fury/config/FuryBuilderTest.java
new file mode 100644
index 00000000..8bd69c32
--- /dev/null
+++ b/java/fury-core/src/test/java/org/apache/fury/config/FuryBuilderTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.config;
+
+import static org.testng.Assert.*;
+
+import org.apache.fury.meta.MetaCompressor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FuryBuilderTest {
+
+  @Test
+  public void testWithMetaCompressor() {
+    MetaCompressor metaCompressor =
+        new FuryBuilder()
+            .withMetaCompressor(
+                new MetaCompressor() {
+                  @Override
+                  public byte[] compress(byte[] data, int offset, int size) {
+                    return new byte[0];
+                  }
+
+                  @Override
+                  public byte[] decompress(byte[] compressedData, int offset, 
int size) {
+                    return new byte[0];
+                  }
+                })
+            .metaCompressor;
+    Assert.assertEquals(metaCompressor.getClass().getSimpleName(), 
"TypeEqualMetaCompressor");
+    new FuryBuilder()
+        .withMetaCompressor(
+            new MetaCompressor() {
+              @Override
+              public byte[] compress(byte[] data, int offset, int size) {
+                return new byte[0];
+              }
+
+              @Override
+              public byte[] decompress(byte[] compressedData, int offset, int 
size) {
+                return new byte[0];
+              }
+
+              @Override
+              public boolean equals(Object o) {
+                if (this == o) {
+                  return true;
+                }
+                return o != null && getClass() == o.getClass();
+              }
+
+              @Override
+              public int hashCode() {
+                return getClass().hashCode();
+              }
+            });
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org

Reply via email to