This is an automated email from the ASF dual-hosted git repository.
zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4300f93f9 [#1745] feat(remote merge): Introduce kryo serializer for
spark. (#2395)
4300f93f9 is described below
commit 4300f93f91b240bbbc0457211c1533297d549fdd
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Mar 14 12:29:51 2025 +0800
[#1745] feat(remote merge): Introduce kryo serializer for spark. (#2395)
### What changes were proposed in this pull request?
Introduce kryo serializer for spark.
### Why are the changes needed?
#1745
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test, real job in cluster
---
client-spark/spark2-shaded/pom.xml | 4 +
client-spark/spark3-shaded/pom.xml | 4 +
.../client/record/reader/RMRecordsReaderTest.java | 8 ++
.../client/record/writer/RecordCollectionTest.java | 4 +
common/pom.xml | 4 +
.../apache/uniffle/common/config/RssBaseConf.java | 28 +++-
.../serializer/kryo/KryoDeserializationStream.java | 109 +++++++++++++++
.../serializer/kryo/KryoSerializationStream.java | 86 ++++++++++++
.../common/serializer/kryo/KryoSerializer.java | 41 ++++++
.../serializer/kryo/KryoSerializerInstance.java | 118 ++++++++++++++++
.../common/serializer/kryo/PoolWrapper.java | 68 +++++++++
.../apache/uniffle/common/merger/MergerTest.java | 2 +
.../common/records/RecordsReaderWriterTest.java | 4 +
.../common/serializer/KryoSerializerTest.java | 152 +++++++++++++++++++++
.../common/serializer/SerializerFactoryTest.java | 12 +-
.../test/RemoteMergeShuffleWithRssClientTest.java | 16 +++
...ShuffleWithRssClientTestWhenShuffleFlushed.java | 16 +++
pom.xml | 14 +-
.../server/merge/BlockFlushFileReaderTest.java | 6 +
.../uniffle/server/merge/MergedResultTest.java | 2 +
.../server/merge/ShuffleMergeManagerTest.java | 2 +
21 files changed, 697 insertions(+), 3 deletions(-)
diff --git a/client-spark/spark2-shaded/pom.xml
b/client-spark/spark2-shaded/pom.xml
index 6f8066ccb..22ac1b01a 100644
--- a/client-spark/spark2-shaded/pom.xml
+++ b/client-spark/spark2-shaded/pom.xml
@@ -216,6 +216,10 @@
<pattern>META-INF/license</pattern>
<shadedPattern>META-INF/license-tmp</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.esotericsoftware</pattern>
+
<shadedPattern>${rss.shade.packageName}.com.esotericsoftware</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git a/client-spark/spark3-shaded/pom.xml
b/client-spark/spark3-shaded/pom.xml
index 67ef6a47a..018ea534e 100644
--- a/client-spark/spark3-shaded/pom.xml
+++ b/client-spark/spark3-shaded/pom.xml
@@ -216,6 +216,10 @@
<pattern>META-INF/license</pattern>
<shadedPattern>META-INF/license-tmp</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.esotericsoftware</pattern>
+
<shadedPattern>${rss.shade.packageName}.com.esotericsoftware</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
b/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
index b1310260b..340624ada 100644
---
a/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
@@ -62,6 +62,8 @@ public class RMRecordsReaderTest {
strings = {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
public void testNormalReadWithoutCombine(String classes) throws Exception {
// 1 basic parameter
@@ -119,6 +121,8 @@ public class RMRecordsReaderTest {
strings = {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
public void testNormalReadWithCombine(String classes) throws Exception {
// 1 basic parameter
@@ -199,6 +203,8 @@ public class RMRecordsReaderTest {
strings = {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
public void testReadMulitPartitionWithoutCombine(String classes) throws
Exception {
// 1 basic parameter
@@ -269,6 +275,8 @@ public class RMRecordsReaderTest {
strings = {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
public void testReadMulitPartitionWithCombine(String classes) throws
Exception {
// 1 basic parameter
diff --git
a/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
b/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
index 2cc1ad8f7..c9cb98896 100644
---
a/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
@@ -45,6 +45,8 @@ public class RecordCollectionTest {
@ValueSource(
strings = {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
public void testSortAndSerializeRecords(String classes) throws Exception {
// 1 Parse arguments
@@ -97,6 +99,8 @@ public class RecordCollectionTest {
@ValueSource(
strings = {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
public void testSortCombineAndSerializeRecords(String classes) throws
Exception {
// 1 Parse arguments
diff --git a/common/pom.xml b/common/pom.xml
index 5e307906b..7330b3fc6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -174,6 +174,10 @@
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-jersey</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 12889e21e..8d4adf42c 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.StorageType;
import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.serializer.kryo.KryoSerializer;
import org.apache.uniffle.common.serializer.writable.WritableSerializer;
import org.apache.uniffle.common.util.RssUtils;
@@ -278,10 +279,35 @@ public class RssBaseConf extends RssConf {
.withDescription("start server service max retry");
/* Serialization */
+ public static final ConfigOption<Boolean> RSS_KRYO_REGISTRATION_REQUIRED =
+ ConfigOptions.key("rss.kryo.registrationRequired")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether registration is required.");
+ public static final ConfigOption<Boolean> RSS_KRYO_REFERENCE_TRACKING =
+ ConfigOptions.key("rss.kryo.referenceTracking")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to track references to the same object when serializing
data with Kryo.");
+ public static final ConfigOption<Boolean>
RSS_KRYO_SCALA_REGISTRATION_REQUIRED =
+ ConfigOptions.key("rss.kryo.scalaRegistrationRequired")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to require registration of some common scala classes, "
+ + "usually used for spark applications");
+ public static final ConfigOption<String> RSS_KRYO_REGISTRATION_CLASSES =
+ ConfigOptions.key("rss.kryo.registrationClasses")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "The classes to be registered. This configuration must ensure
that the"
+ + "client and server are exactly the same. Dynamic
configuration is recommended");
public static final ConfigOption<String> RSS_IO_SERIALIZATIONS =
ConfigOptions.key("rss.io.serializations")
.stringType()
- .defaultValue(WritableSerializer.class.getName())
+ .defaultValue(WritableSerializer.class.getName() + "," +
KryoSerializer.class.getName())
.withDescription("Serializations are used for creative Serializers
and Deserializers");
public static final ConfigOption<String> REST_AUTHORIZATION_CREDENTIALS =
diff --git
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoDeserializationStream.java
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoDeserializationStream.java
new file mode 100644
index 000000000..a636a3f52
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoDeserializationStream.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Locale;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.UnsafeInput;
+
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.SerInputStream;
+
+public class KryoDeserializationStream<K, V> extends DeserializationStream<K,
V> {
+
+ private SerInputStream inputStream;
+ private Input input;
+
+ private final KryoSerializerInstance instance;
+ private Kryo kryo;
+ private long start;
+ private K currentKey;
+ private V currentValue;
+
+ public KryoDeserializationStream(
+ KryoSerializerInstance instance,
+ SerInputStream inputStream,
+ Class<K> keyClass,
+ Class<V> valueClass) {
+ this.inputStream = inputStream;
+ this.instance = instance;
+ this.start = inputStream.getStart();
+ }
+
+ @Override
+ public void init() {
+ this.inputStream.init();
+ this.input = new UnsafeInput(this.inputStream);
+ this.kryo = instance.borrowKryo();
+ }
+
+ @Override
+ public boolean nextRecord() throws IOException {
+ boolean hasNext = available() > 0;
+ if (hasNext) {
+ try {
+ this.currentKey = (K) kryo.readClassAndObject(input);
+ this.currentValue = (V) kryo.readClassAndObject(input);
+ } catch (KryoException e) {
+ if (e.getMessage().toLowerCase(Locale.ROOT).contains("buffer
underflow")) {
+ throw new EOFException();
+ }
+ throw e;
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public K getCurrentKey() throws IOException {
+ return this.currentKey;
+ }
+
+ @Override
+ public V getCurrentValue() throws IOException {
+ return this.currentValue;
+ }
+
+ private long available() throws IOException {
+ // kryo use buffer to read inputStream, so we should use real read offset
by input.total()
+ long totalBytesRead = start + input.total();
+ return inputStream.getEnd() - totalBytesRead;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ if (input != null) {
+ try {
+ input.close();
+ } finally {
+ instance.releaseKryo(kryo);
+ kryo = null;
+ input = null;
+ }
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializationStream.java
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializationStream.java
new file mode 100644
index 000000000..78dd8beb5
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializationStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import java.io.IOException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.UnsafeOutput;
+
+import org.apache.uniffle.common.serializer.SerOutputStream;
+import org.apache.uniffle.common.serializer.SerializationStream;
+import org.apache.uniffle.common.serializer.WrappedByteArrayOutputStream;
+
+public class KryoSerializationStream<K, V> extends SerializationStream {
+
+ private final KryoSerializerInstance instance;
+ private SerOutputStream out;
+
+ private WrappedByteArrayOutputStream byteStream = new
WrappedByteArrayOutputStream(200);
+ private Output output;
+ private Kryo kryo;
+
+ public KryoSerializationStream(KryoSerializerInstance instance,
SerOutputStream out) {
+ this.out = out;
+ this.instance = instance;
+ }
+
+ @Override
+ public void init() {
+ this.output = new UnsafeOutput(byteStream);
+ this.kryo = instance.borrowKryo();
+ }
+
+ @Override
+ public void writeRecord(Object key, Object value) throws IOException {
+ byteStream.reset();
+ kryo.writeClassAndObject(output, key);
+ kryo.writeClassAndObject(output, value);
+ output.flush();
+ int length = byteStream.size();
+ out.preAllocate(length);
+ out.write(byteStream.getBuf(), 0, length);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (output == null) {
+ throw new IOException("Stream is closed");
+ }
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (output != null) {
+ try {
+ output.close();
+ } finally {
+ this.instance.releaseKryo(kryo);
+ kryo = null;
+ output = null;
+ }
+ }
+ }
+
+ @Override
+ public long getTotalBytesWritten() {
+ return output.total();
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializer.java
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializer.java
new file mode 100644
index 000000000..f85d709c6
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.Serializer;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+public class KryoSerializer extends Serializer {
+
+ private final RssConf rssConf;
+
+ public KryoSerializer(RssConf rssConf) {
+ this.rssConf = rssConf;
+ }
+
+ @Override
+ public SerializerInstance newInstance() {
+ return new KryoSerializerInstance(rssConf);
+ }
+
+ @Override
+ public boolean accept(Class<?> c) {
+ return true;
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializerInstance.java
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializerInstance.java
new file mode 100644
index 000000000..31357b011
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/KryoSerializerInstance.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.UnsafeInput;
+import com.esotericsoftware.kryo.io.UnsafeOutput;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.DataInputBuffer;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.SerInputStream;
+import org.apache.uniffle.common.serializer.SerOutputStream;
+import org.apache.uniffle.common.serializer.SerializationStream;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+import static
org.apache.uniffle.common.config.RssBaseConf.RSS_KRYO_REGISTRATION_CLASSES;
+import static
org.apache.uniffle.common.config.RssBaseConf.RSS_KRYO_SCALA_REGISTRATION_REQUIRED;
+
+public class KryoSerializerInstance extends SerializerInstance {
+
+ private final PoolWrapper pool;
+ private final boolean scalaRegistrationRequired;
+ private final RssConf rssConf;
+
+ public KryoSerializerInstance(RssConf rssConf) {
+ this.pool = new PoolWrapper(rssConf);
+ this.scalaRegistrationRequired =
rssConf.getBoolean(RSS_KRYO_SCALA_REGISTRATION_REQUIRED);
+ this.rssConf = rssConf;
+ }
+
+ public Kryo borrowKryo() {
+ Kryo kryo = pool.borrow();
+ kryo.reset();
+ try {
+ Class cls = ClassUtils.getClass("com.twitter.chill.AllScalaRegistrar");
+ Object obj = cls.newInstance();
+ Method method = cls.getDeclaredMethod("apply", Kryo.class);
+ for (String className :
StringUtils.split(rssConf.get(RSS_KRYO_REGISTRATION_CLASSES), ",")) {
+ kryo.register(
+
ClassUtils.getClass(Thread.currentThread().getContextClassLoader(), className));
+ }
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ method.invoke(obj, kryo);
+ } catch (ClassNotFoundException
+ | InvocationTargetException
+ | InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException e) {
+ if (scalaRegistrationRequired) {
+ throw new RssException(e);
+ }
+ }
+ return kryo;
+ }
+
+ public void releaseKryo(Kryo kryo) {
+ pool.release(kryo);
+ }
+
+ public <T> void serialize(T t, DataOutputStream out) throws IOException {
+ Output output = new UnsafeOutput(out);
+ Kryo kryo = this.borrowKryo();
+ try {
+ kryo.writeClassAndObject(output, t);
+ output.flush();
+ } finally {
+ releaseKryo(kryo);
+ }
+ }
+
+ @Override
+ public <T> T deserialize(DataInputBuffer buffer, Class vClass) throws
IOException {
+ UnsafeInput input = new UnsafeInput(buffer);
+ Kryo kryo = this.borrowKryo();
+ try {
+ return (T) kryo.readClassAndObject(input);
+ } finally {
+ releaseKryo(kryo);
+ }
+ }
+
+ @Override
+ public <K, V> SerializationStream serializeStream(
+ SerOutputStream output, boolean raw, boolean shared) {
+ return new KryoSerializationStream(this, output);
+ }
+
+ @Override
+ public <K, V> DeserializationStream deserializeStream(
+ SerInputStream input, Class<K> keyClass, Class<V> valueClass, boolean
raw, boolean shared) {
+ return new KryoDeserializationStream(this, input, keyClass, valueClass);
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/serializer/kryo/PoolWrapper.java
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/PoolWrapper.java
new file mode 100644
index 000000000..a498ddf04
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/serializer/kryo/PoolWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.pool.KryoCallback;
+import com.esotericsoftware.kryo.pool.KryoPool;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+
+public class PoolWrapper implements KryoPool {
+
+ private KryoPool pool;
+ private final boolean registrationRequired;
+ private final boolean referenceTracking;
+
+ public PoolWrapper(RssConf conf) {
+ this.pool = newKryoPool();
+ this.registrationRequired =
conf.getBoolean(RssBaseConf.RSS_KRYO_REGISTRATION_REQUIRED);
+ this.referenceTracking =
conf.getBoolean(RssBaseConf.RSS_KRYO_REFERENCE_TRACKING);
+ }
+
+ private KryoPool newKryoPool() {
+ return new Builder(() -> newKryo()).softReferences().build();
+ }
+
+ public Kryo newKryo() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(this.registrationRequired);
+ kryo.setReferences(this.referenceTracking);
+ return kryo;
+ }
+
+ @Override
+ public Kryo borrow() {
+ return pool.borrow();
+ }
+
+ @Override
+ public void release(Kryo kryo) {
+ pool.release(kryo);
+ }
+
+ @Override
+ public <T> T run(KryoCallback<T> callback) {
+ return pool.run(callback);
+ }
+
+ public void reset() {
+ pool = newKryoPool();
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
index 529496f82..473069ff6 100644
--- a/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
@@ -53,6 +53,8 @@ class MergerTest {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
void testMergeSegmentToFile(String classes, @TempDir File tmpDir) throws
Exception {
// 1 Parse arguments
diff --git
a/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
index b90d93fd7..54c852d16 100644
---
a/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
@@ -59,6 +59,10 @@ class RecordsReaderWriterTest {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file,true,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file,false,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file,false,false",
+ "java.lang.String,java.lang.Integer,mem",
+ "java.lang.String,java.lang.Integer,file",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,mem",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,file"
})
void testWriteAndReadRecordFile(String classes, @TempDir File tmpDir) throws
Exception {
RssConf rssConf = new RssConf();
diff --git
a/common/src/test/java/org/apache/uniffle/common/serializer/KryoSerializerTest.java
b/common/src/test/java/org/apache/uniffle/common/serializer/KryoSerializerTest.java
new file mode 100644
index 000000000..656a522b1
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/serializer/KryoSerializerTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.File;
+import java.util.Random;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.SerializerUtils.SomeClass;
+import org.apache.uniffle.common.serializer.kryo.KryoSerializer;
+
+import static org.apache.uniffle.common.serializer.SerializerUtils.genData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KryoSerializerTest {
+
+ private static final int LOOP = 1009;
+ private static RssConf rssConf = new RssConf();
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testKryoWriteRandomRead(boolean isFileMode, @TempDir File tmpDir)
throws Exception {
+ Kryo kryo = new Kryo();
+ // 1 Write object to stream
+ SerOutputStream outputStream =
+ isFileMode
+ ? new FileSerOutputStream(new File(tmpDir, "tmp.data"))
+ : new DynBufferSerOutputStream();
+ Output output = new Output(outputStream);
+ long[] offsets = new long[LOOP];
+ for (int i = 0; i < LOOP; i++) {
+ SomeClass object = (SomeClass) SerializerUtils.genData(SomeClass.class,
i);
+ kryo.writeObject(output, object);
+ offsets[i] = output.total();
+ }
+ output.close();
+
+ // 2 Read object from every offset
+ ByteBuf byteBuf = isFileMode ? null : outputStream.toByteBuf();
+ for (int i = 0; i < LOOP; i++) {
+ long off = offsets[i];
+ Input input =
+ isFileMode
+ ? new Input(SerInputStream.newInputStream(new File(tmpDir,
"tmp.data"), off))
+ : new Input(SerInputStream.newInputStream(byteBuf, (int) off));
+ for (int j = i + 1; j < LOOP; j++) {
+ SomeClass object = kryo.readObject(input, SomeClass.class);
+ assertEquals(SerializerUtils.genData(SomeClass.class, j), object);
+ }
+ input.close();
+ }
+ if (!isFileMode) {
+ byteBuf.release();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(
+ strings = {
+ "java.lang.String,java.lang.Integer,mem",
+ "java.lang.String,java.lang.Integer,file",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,int,mem",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,int,file"
+ })
+ public void testSerDeKeyValues(String classes, @TempDir File tmpDir) throws
Exception {
+ // 1 Construct serializer
+ String[] classArray = classes.split(",");
+ Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+ Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+ boolean isFileMode = classArray[2].equals("file");
+
+ KryoSerializer serializer = new KryoSerializer(rssConf);
+ SerializerInstance instance = serializer.newInstance();
+
+ // 2 Write
+ SerOutputStream outputStream =
+ isFileMode
+ ? new FileSerOutputStream(new File(tmpDir, "tmp.kryo"))
+ : new DynBufferSerOutputStream();
+ SerializationStream serializationStream =
instance.serializeStream(outputStream, false, false);
+ serializationStream.init();
+ long[] offsets = new long[LOOP];
+ for (int i = 0; i < LOOP; i++) {
+ serializationStream.writeRecord(genData(keyClass, i),
genData(valueClass, i));
+ offsets[i] = serializationStream.getTotalBytesWritten();
+ }
+ serializationStream.close();
+
+ // 3 Random read
+ ByteBuf byteBuf = isFileMode ? null : outputStream.toByteBuf();
+ for (int i = 0; i < LOOP; i++) {
+ long off = offsets[i];
+ SerInputStream inputStream =
+ isFileMode
+ ? SerInputStream.newInputStream(new File(tmpDir, "tmp.kryo"),
off)
+ : SerInputStream.newInputStream(byteBuf, (int) off);
+ DeserializationStream deserializationStream =
+ instance.deserializeStream(inputStream, keyClass, valueClass, false,
false);
+ deserializationStream.init();
+ for (int j = i + 1; j < LOOP; j++) {
+ assertTrue(deserializationStream.nextRecord());
+ assertEquals(genData(keyClass, j),
deserializationStream.getCurrentKey());
+ assertEquals(genData(valueClass, j),
deserializationStream.getCurrentValue());
+ }
+ deserializationStream.close();
+ }
+ if (!isFileMode) {
+ byteBuf.release();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(
+ classes = {java.lang.String.class, java.lang.Integer.class,
SomeClass.class, int.class})
+ public void testSerDeObject(Class aClass) throws Exception {
+ KryoSerializer serializer = new KryoSerializer(rssConf);
+ SerializerInstance instance = serializer.newInstance();
+ int number = new Random().nextInt(99999);
+ DataOutputBuffer output = new DataOutputBuffer();
+ instance.serialize(genData(aClass, number), output);
+ DataInputBuffer input = new DataInputBuffer();
+ input.reset(output.getData(), 0, output.getData().length);
+ Object obj = instance.deserialize(input, aClass);
+ assertEquals(genData(aClass, number), obj);
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
index b7fd15520..b0ffc568f 100644
---
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.io.Writable;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.kryo.KryoSerializer;
import org.apache.uniffle.common.serializer.writable.WritableSerializer;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,13 +39,22 @@ public class SerializerFactoryTest {
// 1 Test whether it is null
assertNotNull(factory.getSerializer(Writable.class));
assertNotNull(factory.getSerializer(IntWritable.class));
+ assertNotNull(factory.getSerializer(SerializerUtils.SomeClass.class));
+ assertNotNull(factory.getSerializer(String.class));
+ assertNotNull(factory.getSerializer(int.class));
// 2 Check whether the type serializer is right
assertInstanceOf(WritableSerializer.class,
factory.getSerializer(Writable.class));
assertInstanceOf(WritableSerializer.class,
factory.getSerializer(IntWritable.class));
+ assertInstanceOf(KryoSerializer.class,
factory.getSerializer(SerializerUtils.SomeClass.class));
+ assertInstanceOf(KryoSerializer.class,
factory.getSerializer(String.class));
+ assertInstanceOf(KryoSerializer.class, factory.getSerializer(int.class));
- // 2 Check whether the serializer is cached
+ // 3 Check whether the serializer is cached
Serializer serializer = factory.getSerializer(Writable.class);
assertEquals(serializer, factory.getSerializer(IntWritable.class));
+ serializer = factory.getSerializer(SerializerUtils.SomeClass.class);
+ assertEquals(serializer, factory.getSerializer(String.class));
+ assertEquals(serializer, factory.getSerializer(int.class));
}
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
index 809e28a06..e32cdedd0 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
@@ -135,6 +135,10 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTest(String classes) throws Exception {
@@ -302,6 +306,10 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTestWithCombine(String classes) throws
Exception {
@@ -482,6 +490,10 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTestMultiPartition(String classes) throws
Exception {
@@ -692,6 +704,10 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTestMultiPartitionWithCombine(String
classes) throws Exception {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
index 51276e128..0e83ed0cd 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
@@ -149,6 +149,10 @@ public class
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTest(String classes) throws Exception {
@@ -316,6 +320,10 @@ public class
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTestWithCombine(String classes) throws
Exception {
@@ -497,6 +505,10 @@ public class
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTestMultiPartition(String classes) throws
Exception {
@@ -707,6 +719,10 @@ public class
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,GRPC_NETTY,false",
+ "java.lang.String,java.lang.Integer,GRPC",
+ "java.lang.String,java.lang.Integer,GRPC_NETTY",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,GRPC_NETTY",
})
@Timeout(10)
public void remoteMergeWriteReadTestMultiPartitionWithCombine(String
classes) throws Exception {
diff --git a/pom.xml b/pom.xml
index 5085cfa65..9d96c71ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@
<skipITs>${skipTests}</skipITs>
<skipBuildImage>true</skipBuildImage>
<snakeyaml.version>2.2</snakeyaml.version>
+ <kryo.version>4.0.2</kryo.version>
</properties>
<repositories>
@@ -801,13 +802,24 @@
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
- <!-- end -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ <version>${kryo.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.objenesis</groupId>
+ <artifactId>objenesis</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- end -->
</dependencies>
</dependencyManagement>
diff --git
a/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
b/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
index 3d93d93e2..e2681981f 100644
---
a/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
@@ -67,6 +67,10 @@ public class BlockFlushFileReaderTest {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,8,true,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,8,false,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,8,false,false",
+ "java.lang.String,java.lang.Integer,2",
+ "java.lang.String,java.lang.Integer,8",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,2",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,8",
})
void writeTestWithMerge(String classes, @TempDir File tmpDir) throws
Exception {
final String[] classArray = classes.split(",");
@@ -166,6 +170,8 @@ public class BlockFlushFileReaderTest {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2,true,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2,false,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2,false,false",
+ "java.lang.String,java.lang.Integer,2",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer,2",
})
void writeTestWithMergeWhenInterrupted(String classes, @TempDir File tmpDir)
throws Exception {
String[] classArray = classes.split(",");
diff --git
a/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
b/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
index 66fbeb782..e60821e14 100644
--- a/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
@@ -104,6 +104,8 @@ class MergedResultTest {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
void testMergeSegmentToMergeResult(String classes, @TempDir File tmpDir)
throws Exception {
// 1 Parse arguments
diff --git
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
index 0eb4ed1e8..ce2ad6f9b 100644
---
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
@@ -111,6 +111,8 @@ public class ShuffleMergeManagerTest {
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,true,false",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,true",
"org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,false,false",
+ "java.lang.String,java.lang.Integer",
+
"org.apache.uniffle.common.serializer.SerializerUtils$SomeClass,java.lang.Integer",
})
void testMergerManager(String classes) throws Exception {
// 1 Construct serializer and comparator