This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4300f93f9 [#1745] feat(remote merge): Introduce kryo serializer for 
spark. (#2395)
4300f93f9 is described below

commit 4300f93f91b240bbbc0457211c1533297d549fdd
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Mar 14 12:29:51 2025 +0800

    [#1745] feat(remote merge): Introduce kryo serializer for spark. (#2395)
    
    ### What changes were proposed in this pull request?
    
    Introduce kryo serializer for spark.
    
    ### Why are the changes needed?
    
    #1745
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test,  real job in cluster
---
 client-spark/spark2-shaded/pom.xml                 |   4 +
 client-spark/spark3-shaded/pom.xml                 |   4 +
 .../client/record/reader/RMRecordsReaderTest.java  |   8 ++
 .../client/record/writer/RecordCollectionTest.java |   4 +
 common/pom.xml                                     |   4 +
 .../apache/uniffle/common/config/RssBaseConf.java  |  28 +++-
 .../serializer/kryo/KryoDeserializationStream.java | 109 +++++++++++++++
 .../serializer/kryo/KryoSerializationStream.java   |  86 ++++++++++++
 .../common/serializer/kryo/KryoSerializer.java     |  41 ++++++
 .../serializer/kryo/KryoSerializerInstance.java    | 118 ++++++++++++++++
 .../common/serializer/kryo/PoolWrapper.java        |  68 +++++++++
 .../apache/uniffle/common/merger/MergerTest.java   |   2 +
 .../common/records/RecordsReaderWriterTest.java    |   4 +
 .../common/serializer/KryoSerializerTest.java      | 152 +++++++++++++++++++++
 .../common/serializer/SerializerFactoryTest.java   |  12 +-
 .../test/RemoteMergeShuffleWithRssClientTest.java  |  16 +++
 ...ShuffleWithRssClientTestWhenShuffleFlushed.java |  16 +++
 pom.xml                                            |  14 +-
 .../server/merge/BlockFlushFileReaderTest.java     |   6 +
 .../uniffle/server/merge/MergedResultTest.java     |   2 +
 .../server/merge/ShuffleMergeManagerTest.java      |   2 +
 21 files changed, 697 insertions(+), 3 deletions(-)

diff --git a/client-spark/spark2-shaded/pom.xml 
b/client-spark/spark2-shaded/pom.xml
index 6f8066ccb..22ac1b01a 100644
--- a/client-spark/spark2-shaded/pom.xml
+++ b/client-spark/spark2-shaded/pom.xml
@@ -216,6 +216,10 @@
                   <pattern>META-INF/license</pattern>
                   <shadedPattern>META-INF/license-tmp</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware</pattern>
+                  
<shadedPattern>${rss.shade.packageName}.com.esotericsoftware</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/client-spark/spark3-shaded/pom.xml 
b/client-spark/spark3-shaded/pom.xml
index 67ef6a47a..018ea534e 100644
--- a/client-spark/spark3-shaded/pom.xml
+++ b/client-spark/spark3-shaded/pom.xml
@@ -216,6 +216,10 @@
                   <pattern>META-INF/license</pattern>
                   <shadedPattern>META-INF/license-tmp</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware</pattern>
+                  
<shadedPattern>${rss.shade.packageName}.com.esotericsoftware</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git 
a/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
 
b/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
index b1310260b..340624ada 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
@@ -62,6 +62,8 @@ public class RMRecordsReaderTest {
       strings = {
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   public void testNormalReadWithoutCombine(String classes) throws Exception {
     // 1 basic parameter
@@ -119,6 +121,8 @@ public class RMRecordsReaderTest {
       strings = {
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   public void testNormalReadWithCombine(String classes) throws Exception {
     // 1 basic parameter
@@ -199,6 +203,8 @@ public class RMRecordsReaderTest {
       strings = {
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   public void testReadMulitPartitionWithoutCombine(String classes) throws 
Exception {
     // 1 basic parameter
@@ -269,6 +275,8 @@ public class RMRecordsReaderTest {
       strings = {
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   public void testReadMulitPartitionWithCombine(String classes) throws 
Exception {
     // 1 basic parameter
diff --git 
a/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
 
b/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
index 2cc1ad8f7..c9cb98896 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
@@ -45,6 +45,8 @@ public class RecordCollectionTest {
   @ValueSource(
       strings = {
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   public void testSortAndSerializeRecords(String classes) throws Exception {
     // 1 Parse arguments
@@ -97,6 +99,8 @@ public class RecordCollectionTest {
   @ValueSource(
       strings = {
         "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   public void testSortCombineAndSerializeRecords(String classes) throws 
Exception {
     // 1 Parse arguments
diff --git a/common/pom.xml b/common/pom.xml
index 5e307906b..7330b3fc6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -174,6 +174,10 @@
       <groupId>org.apache.hbase.thirdparty</groupId>
       <artifactId>hbase-shaded-jersey</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo-shaded</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 12889e21e..8d4adf42c 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.StorageType;
 import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.serializer.kryo.KryoSerializer;
 import org.apache.uniffle.common.serializer.writable.WritableSerializer;
 import org.apache.uniffle.common.util.RssUtils;
 
@@ -278,10 +279,35 @@ public class RssBaseConf extends RssConf {
           .withDescription("start server service max retry");
 
   /* Serialization */
+  public static final ConfigOption<Boolean> RSS_KRYO_REGISTRATION_REQUIRED =
+      ConfigOptions.key("rss.kryo.registrationRequired")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether registration is required.");
+  public static final ConfigOption<Boolean> RSS_KRYO_REFERENCE_TRACKING =
+      ConfigOptions.key("rss.kryo.referenceTracking")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "Whether to track references to the same object when serializing 
data with Kryo.");
+  public static final ConfigOption<Boolean> 
RSS_KRYO_SCALA_REGISTRATION_REQUIRED =
+      ConfigOptions.key("rss.kryo.scalaRegistrationRequired")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "Whether to require registration of some common scala classes, "
+                  + "usually used for spark applications");
+  public static final ConfigOption<String> RSS_KRYO_REGISTRATION_CLASSES =
+      ConfigOptions.key("rss.kryo.registrationClasses")
+          .stringType()
+          .defaultValue("")
+          .withDescription(
+              "The classes to be registered. This configuration must ensure 
that the"
+                  + "client and server are exactly the same. Dynamic 
configuration is recommended");
   public static final ConfigOption<String> RSS_IO_SERIALIZATIONS =
       ConfigOptions.key("rss.io.serializations")
           .stringType()
-          .defaultValue(WritableSerializer.class.getName())
+          .defaultValue(WritableSerializer.class.getName() + "," + 
KryoSerializer.class.getName())
           .withDescription("Serializations are used for creative Serializers 
and Deserializers");
 
   public static final ConfigOption<String> REST_AUTHORIZATION_CREDENTIALS =
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoDeserializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoDeserializationStream.java
new file mode 100644
index 000000000..a636a3f52
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoDeserializationStream.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Locale;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.UnsafeInput;
+
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.SerInputStream;
+
+public class KryoDeserializationStream<K, V> extends DeserializationStream<K, 
V> {
+
+  private SerInputStream inputStream;
+  private Input input;
+
+  private final KryoSerializerInstance instance;
+  private Kryo kryo;
+  private long start;
+  private K currentKey;
+  private V currentValue;
+
+  public KryoDeserializationStream(
+      KryoSerializerInstance instance,
+      SerInputStream inputStream,
+      Class<K> keyClass,
+      Class<V> valueClass) {
+    this.inputStream = inputStream;
+    this.instance = instance;
+    this.start = inputStream.getStart();
+  }
+
+  @Override
+  public void init() {
+    this.inputStream.init();
+    this.input = new UnsafeInput(this.inputStream);
+    this.kryo = instance.borrowKryo();
+  }
+
+  @Override
+  public boolean nextRecord() throws IOException {
+    boolean hasNext = available() > 0;
+    if (hasNext) {
+      try {
+        this.currentKey = (K) kryo.readClassAndObject(input);
+        this.currentValue = (V) kryo.readClassAndObject(input);
+      } catch (KryoException e) {
+        if (e.getMessage().toLowerCase(Locale.ROOT).contains("buffer 
underflow")) {
+          throw new EOFException();
+        }
+        throw e;
+      }
+    }
+    return hasNext;
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException {
+    return this.currentKey;
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException {
+    return this.currentValue;
+  }
+
+  private long available() throws IOException {
+    // kryo use buffer to read inputStream, so we should use real read offset 
by input.total()
+    long totalBytesRead = start + input.total();
+    return inputStream.getEnd() - totalBytesRead;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (inputStream != null) {
+      inputStream.close();
+      inputStream = null;
+    }
+    if (input != null) {
+      try {
+        input.close();
+      } finally {
+        instance.releaseKryo(kryo);
+        kryo = null;
+        input = null;
+      }
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializationStream.java
new file mode 100644
index 000000000..78dd8beb5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializationStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import java.io.IOException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.UnsafeOutput;
+
+import org.apache.uniffle.common.serializer.SerOutputStream;
+import org.apache.uniffle.common.serializer.SerializationStream;
+import org.apache.uniffle.common.serializer.WrappedByteArrayOutputStream;
+
+public class KryoSerializationStream<K, V> extends SerializationStream {
+
+  private final KryoSerializerInstance instance;
+  private SerOutputStream out;
+
+  private WrappedByteArrayOutputStream byteStream = new 
WrappedByteArrayOutputStream(200);
+  private Output output;
+  private Kryo kryo;
+
+  public KryoSerializationStream(KryoSerializerInstance instance, 
SerOutputStream out) {
+    this.out = out;
+    this.instance = instance;
+  }
+
+  @Override
+  public void init() {
+    this.output = new UnsafeOutput(byteStream);
+    this.kryo = instance.borrowKryo();
+  }
+
+  @Override
+  public void writeRecord(Object key, Object value) throws IOException {
+    byteStream.reset();
+    kryo.writeClassAndObject(output, key);
+    kryo.writeClassAndObject(output, value);
+    output.flush();
+    int length = byteStream.size();
+    out.preAllocate(length);
+    out.write(byteStream.getBuf(), 0, length);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (output == null) {
+      throw new IOException("Stream is closed");
+    }
+    out.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (output != null) {
+      try {
+        output.close();
+      } finally {
+        this.instance.releaseKryo(kryo);
+        kryo = null;
+        output = null;
+      }
+    }
+  }
+
+  @Override
+  public long getTotalBytesWritten() {
+    return output.total();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializer.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializer.java
new file mode 100644
index 000000000..f85d709c6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.Serializer;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+public class KryoSerializer extends Serializer {
+
+  private final RssConf rssConf;
+
+  public KryoSerializer(RssConf rssConf) {
+    this.rssConf = rssConf;
+  }
+
+  @Override
+  public SerializerInstance newInstance() {
+    return new KryoSerializerInstance(rssConf);
+  }
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return true;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializerInstance.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializerInstance.java
new file mode 100644
index 000000000..31357b011
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializerInstance.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.UnsafeInput;
+import com.esotericsoftware.kryo.io.UnsafeOutput;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.DataInputBuffer;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.SerInputStream;
+import org.apache.uniffle.common.serializer.SerOutputStream;
+import org.apache.uniffle.common.serializer.SerializationStream;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_KRYO_REGISTRATION_CLASSES;
+import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_KRYO_SCALA_REGISTRATION_REQUIRED;
+
+public class KryoSerializerInstance extends SerializerInstance {
+
+  private final PoolWrapper pool;
+  private final boolean scalaRegistrationRequired;
+  private final RssConf rssConf;
+
+  public KryoSerializerInstance(RssConf rssConf) {
+    this.pool = new PoolWrapper(rssConf);
+    this.scalaRegistrationRequired = 
rssConf.getBoolean(RSS_KRYO_SCALA_REGISTRATION_REQUIRED);
+    this.rssConf = rssConf;
+  }
+
+  public Kryo borrowKryo() {
+    Kryo kryo = pool.borrow();
+    kryo.reset();
+    try {
+      Class cls = ClassUtils.getClass("com.twitter.chill.AllScalaRegistrar");
+      Object obj = cls.newInstance();
+      Method method = cls.getDeclaredMethod("apply", Kryo.class);
+      for (String className : 
StringUtils.split(rssConf.get(RSS_KRYO_REGISTRATION_CLASSES), ",")) {
+        kryo.register(
+            
ClassUtils.getClass(Thread.currentThread().getContextClassLoader(), className));
+      }
+      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+      method.invoke(obj, kryo);
+    } catch (ClassNotFoundException
+        | InvocationTargetException
+        | InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException e) {
+      if (scalaRegistrationRequired) {
+        throw new RssException(e);
+      }
+    }
+    return kryo;
+  }
+
+  public void releaseKryo(Kryo kryo) {
+    pool.release(kryo);
+  }
+
+  public <T> void serialize(T t, DataOutputStream out) throws IOException {
+    Output output = new UnsafeOutput(out);
+    Kryo kryo = this.borrowKryo();
+    try {
+      kryo.writeClassAndObject(output, t);
+      output.flush();
+    } finally {
+      releaseKryo(kryo);
+    }
+  }
+
+  @Override
+  public <T> T deserialize(DataInputBuffer buffer, Class vClass) throws 
IOException {
+    UnsafeInput input = new UnsafeInput(buffer);
+    Kryo kryo = this.borrowKryo();
+    try {
+      return (T) kryo.readClassAndObject(input);
+    } finally {
+      releaseKryo(kryo);
+    }
+  }
+
+  @Override
+  public <K, V> SerializationStream serializeStream(
+      SerOutputStream output, boolean raw, boolean shared) {
+    return new KryoSerializationStream(this, output);
+  }
+
+  @Override
+  public <K, V> DeserializationStream deserializeStream(
+      SerInputStream input, Class<K> keyClass, Class<V> valueClass, boolean 
raw, boolean shared) {
+    return new KryoDeserializationStream(this, input, keyClass, valueClass);
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/PoolWrapper.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/PoolWrapper.java
new file mode 100644
index 000000000..a498ddf04
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/PoolWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.pool.KryoCallback;
+import com.esotericsoftware.kryo.pool.KryoPool;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+
+public class PoolWrapper implements KryoPool {
+
+  private KryoPool pool;
+  private final boolean registrationRequired;
+  private final boolean referenceTracking;
+
+  public PoolWrapper(RssConf conf) {
+    this.pool = newKryoPool();
+    this.registrationRequired = 
conf.getBoolean(RssBaseConf.RSS_KRYO_REGISTRATION_REQUIRED);
+    this.referenceTracking = 
conf.getBoolean(RssBaseConf.RSS_KRYO_REFERENCE_TRACKING);
+  }
+
+  private KryoPool newKryoPool() {
+    return new Builder(() -> newKryo()).softReferences().build();
+  }
+
+  public Kryo newKryo() {
+    Kryo kryo = new Kryo();
+    kryo.setRegistrationRequired(this.registrationRequired);
+    kryo.setReferences(this.referenceTracking);
+    return kryo;
+  }
+
+  @Override
+  public Kryo borrow() {
+    return pool.borrow();
+  }
+
+  @Override
+  public void release(Kryo kryo) {
+    pool.release(kryo);
+  }
+
+  @Override
+  public <T> T run(KryoCallback<T> callback) {
+    return pool.run(callback);
+  }
+
+  public void reset() {
+    pool = newKryoPool();
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java 
b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
index 529496f82..473069ff6 100644
--- a/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
@@ -53,6 +53,8 @@ class MergerTest {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   void testMergeSegmentToFile(String classes, @TempDir File tmpDir) throws 
Exception {
     // 1 Parse arguments
diff --git 
a/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
 
b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
index b90d93fd7..54c852d16 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
@@ -59,6 +59,10 @@ class RecordsReaderWriterTest {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file,true,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file,false,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file,false,false",
+        "java.lang.String,java.lang.Integer,mem",
+        "java.lang.String,java.lang.Integer,file",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,mem",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,file"
       })
   void testWriteAndReadRecordFile(String classes, @TempDir File tmpDir) throws 
Exception {
     RssConf rssConf = new RssConf();
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/KryoSerializerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/KryoSerializerTest.java
new file mode 100644
index 000000000..656a522b1
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/KryoSerializerTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.File;
+import java.util.Random;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.SerializerUtils.SomeClass;
+import org.apache.uniffle.common.serializer.kryo.KryoSerializer;
+
+import static org.apache.uniffle.common.serializer.SerializerUtils.genData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KryoSerializerTest {
+
+  private static final int LOOP = 1009;
+  private static RssConf rssConf = new RssConf();
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testKryoWriteRandomRead(boolean isFileMode, @TempDir File tmpDir) 
throws Exception {
+    Kryo kryo = new Kryo();
+    // 1 Write object to stream
+    SerOutputStream outputStream =
+        isFileMode
+            ? new FileSerOutputStream(new File(tmpDir, "tmp.data"))
+            : new DynBufferSerOutputStream();
+    Output output = new Output(outputStream);
+    long[] offsets = new long[LOOP];
+    for (int i = 0; i < LOOP; i++) {
+      SomeClass object = (SomeClass) SerializerUtils.genData(SomeClass.class, 
i);
+      kryo.writeObject(output, object);
+      offsets[i] = output.total();
+    }
+    output.close();
+
+    // 2 Read object from every offset
+    ByteBuf byteBuf = isFileMode ? null : outputStream.toByteBuf();
+    for (int i = 0; i < LOOP; i++) {
+      long off = offsets[i];
+      Input input =
+          isFileMode
+              ? new Input(SerInputStream.newInputStream(new File(tmpDir, 
"tmp.data"), off))
+              : new Input(SerInputStream.newInputStream(byteBuf, (int) off));
+      for (int j = i + 1; j < LOOP; j++) {
+        SomeClass object = kryo.readObject(input, SomeClass.class);
+        assertEquals(SerializerUtils.genData(SomeClass.class, j), object);
+      }
+      input.close();
+    }
+    if (!isFileMode) {
+      byteBuf.release();
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "java.lang.String,java.lang.Integer,mem",
+        "java.lang.String,java.lang.Integer,file",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,int,mem",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,int,file"
+      })
+  public void testSerDeKeyValues(String classes, @TempDir File tmpDir) throws 
Exception {
+    // 1 Construct serializer
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+
+    KryoSerializer serializer = new KryoSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    SerOutputStream outputStream =
+        isFileMode
+            ? new FileSerOutputStream(new File(tmpDir, "tmp.kryo"))
+            : new DynBufferSerOutputStream();
+    SerializationStream serializationStream = 
instance.serializeStream(outputStream, false, false);
+    serializationStream.init();
+    long[] offsets = new long[LOOP];
+    for (int i = 0; i < LOOP; i++) {
+      serializationStream.writeRecord(genData(keyClass, i), 
genData(valueClass, i));
+      offsets[i] = serializationStream.getTotalBytesWritten();
+    }
+    serializationStream.close();
+
+    // 3 Random read
+    ByteBuf byteBuf = isFileMode ? null : outputStream.toByteBuf();
+    for (int i = 0; i < LOOP; i++) {
+      long off = offsets[i];
+      SerInputStream inputStream =
+          isFileMode
+              ? SerInputStream.newInputStream(new File(tmpDir, "tmp.kryo"), 
off)
+              : SerInputStream.newInputStream(byteBuf, (int) off);
+      DeserializationStream deserializationStream =
+          instance.deserializeStream(inputStream, keyClass, valueClass, false, 
false);
+      deserializationStream.init();
+      for (int j = i + 1; j < LOOP; j++) {
+        assertTrue(deserializationStream.nextRecord());
+        assertEquals(genData(keyClass, j), 
deserializationStream.getCurrentKey());
+        assertEquals(genData(valueClass, j), 
deserializationStream.getCurrentValue());
+      }
+      deserializationStream.close();
+    }
+    if (!isFileMode) {
+      byteBuf.release();
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(
+      classes = {java.lang.String.class, java.lang.Integer.class, 
SomeClass.class, int.class})
+  public void testSerDeObject(Class aClass) throws Exception {
+    KryoSerializer serializer = new KryoSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+    int number = new Random().nextInt(99999);
+    DataOutputBuffer output = new DataOutputBuffer();
+    instance.serialize(genData(aClass, number), output);
+    DataInputBuffer input = new DataInputBuffer();
+    input.reset(output.getData(), 0, output.getData().length);
+    Object obj = instance.deserialize(input, aClass);
+    assertEquals(genData(aClass, number), obj);
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
index b7fd15520..b0ffc568f 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.io.Writable;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.kryo.KryoSerializer;
 import org.apache.uniffle.common.serializer.writable.WritableSerializer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,13 +39,22 @@ public class SerializerFactoryTest {
     // 1 Test whether it is null
     assertNotNull(factory.getSerializer(Writable.class));
     assertNotNull(factory.getSerializer(IntWritable.class));
+    assertNotNull(factory.getSerializer(SerializerUtils.SomeClass.class));
+    assertNotNull(factory.getSerializer(String.class));
+    assertNotNull(factory.getSerializer(int.class));
 
     // 2 Check whether the type serializer is right
     assertInstanceOf(WritableSerializer.class, 
factory.getSerializer(Writable.class));
     assertInstanceOf(WritableSerializer.class, 
factory.getSerializer(IntWritable.class));
+    assertInstanceOf(KryoSerializer.class, 
factory.getSerializer(SerializerUtils.SomeClass.class));
+    assertInstanceOf(KryoSerializer.class, 
factory.getSerializer(String.class));
+    assertInstanceOf(KryoSerializer.class, factory.getSerializer(int.class));
 
-    // 2 Check whether the serializer is cached
+    // 3 Check whether the serializer is cached
     Serializer serializer = factory.getSerializer(Writable.class);
     assertEquals(serializer, factory.getSerializer(IntWritable.class));
+    serializer = factory.getSerializer(SerializerUtils.SomeClass.class);
+    assertEquals(serializer, factory.getSerializer(String.class));
+    assertEquals(serializer, factory.getSerializer(int.class));
   }
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
index 809e28a06..e32cdedd0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
@@ -135,6 +135,10 @@ public class RemoteMergeShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTest(String classes) throws Exception {
@@ -302,6 +306,10 @@ public class RemoteMergeShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTestWithCombine(String classes) throws 
Exception {
@@ -482,6 +490,10 @@ public class RemoteMergeShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTestMultiPartition(String classes) throws 
Exception {
@@ -692,6 +704,10 @@ public class RemoteMergeShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTestMultiPartitionWithCombine(String 
classes) throws Exception {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
index 51276e128..0e83ed0cd 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
@@ -149,6 +149,10 @@ public class 
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTest(String classes) throws Exception {
@@ -316,6 +320,10 @@ public class 
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTestWithCombine(String classes) throws 
Exception {
@@ -497,6 +505,10 @@ public class 
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTestMultiPartition(String classes) throws 
Exception {
@@ -707,6 +719,10 @@ public class 
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+        "java.lang.String,java.lang.Integer,GRPC",
+        "java.lang.String,java.lang.Integer,GRPC_NETTY",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
       })
   @Timeout(10)
   public void remoteMergeWriteReadTestMultiPartitionWithCombine(String 
classes) throws Exception {
diff --git a/pom.xml b/pom.xml
index 5085cfa65..9d96c71ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@
     <skipITs>${skipTests}</skipITs>
     <skipBuildImage>true</skipBuildImage>
     <snakeyaml.version>2.2</snakeyaml.version>
+    <kryo.version>4.0.2</kryo.version>
   </properties>
 
   <repositories>
@@ -801,13 +802,24 @@
         <artifactId>log4j-core</artifactId>
         <version>${log4j.version}</version>
       </dependency>
-      <!-- end -->
       <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <version>${lombok.version}</version>
         <scope>provided</scope>
       </dependency>
+      <dependency>
+        <groupId>com.esotericsoftware</groupId>
+        <artifactId>kryo-shaded</artifactId>
+        <version>${kryo.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.objenesis</groupId>
+            <artifactId>objenesis</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <!-- end -->
     </dependencies>
   </dependencyManagement>
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
 
b/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
index 3d93d93e2..e2681981f 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
@@ -67,6 +67,10 @@ public class BlockFlushFileReaderTest {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,8,true,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,8,false,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,8,false,false",
+        "java.lang.String,java.lang.Integer,2",
+        "java.lang.String,java.lang.Integer,8",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,2",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,8",
       })
   void writeTestWithMerge(String classes, @TempDir File tmpDir) throws 
Exception {
     final String[] classArray = classes.split(",");
@@ -166,6 +170,8 @@ public class BlockFlushFileReaderTest {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2,true,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2,false,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2,false,false",
+        "java.lang.String,java.lang.Integer,2",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,2",
       })
   void writeTestWithMergeWhenInterrupted(String classes, @TempDir File tmpDir) 
throws Exception {
     String[] classArray = classes.split(",");
diff --git 
a/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java 
b/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
index 66fbeb782..e60821e14 100644
--- a/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
@@ -104,6 +104,8 @@ class MergedResultTest {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   void testMergeSegmentToMergeResult(String classes, @TempDir File tmpDir) 
throws Exception {
     // 1 Parse arguments
diff --git 
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
index 0eb4ed1e8..ce2ad6f9b 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
@@ -111,6 +111,8 @@ public class ShuffleMergeManagerTest {
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true,false",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,true",
         
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,false",
+        "java.lang.String,java.lang.Integer",
+        
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
       })
   void testMergerManager(String classes) throws Exception {
     // 1 Construct serializer and comparator


Reply via email to