This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 847e7a975bfeb94956885cc252285f95afc4a843 Author: Alexey Kudinkin <alexey.kudin...@gmail.com> AuthorDate: Fri Feb 10 15:02:47 2023 -0800 [HUDI-5758] Restoring state of `HoodieKey` to make sure it's binary compatible w/ its state in 0.12 (#7917) RFC-46 modified `HoodieKey` to substantially optimize its serialized footprint (while using Kryo) by making it explicitly serializable by Kryo (inheriting form `KryoSerializable`, making it final). However, this broken its binary compatibility w/ the state as it was in 0.12.2. Unfortunately, this entailed that as this class is used in `DeleteRecord` w/in `HoodieDeleteBlock` that it also made impossible to read such blocks created by prior Hudi versions (more details in HUDI-5758). This PR restores previous state for `HoodieKey` to make sure it stays binary compatible w/ existing persisted `HoodieDeleteBlock` created by prior Hudi versions --- .../apache/spark/HoodieSparkKryoRegistrar.scala | 25 +++++++++++++++++-- .../org/apache/hudi/common/model/DeleteRecord.java | 9 +++++++ .../org/apache/hudi/common/model/HoodieKey.java | 28 ++++++++-------------- .../common/table/log/block/HoodieDeleteBlock.java | 2 ++ 4 files changed, 44 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala index 3894065d809..9d7fa3b784f 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala @@ -18,11 +18,12 @@ package org.apache.spark -import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} import com.esotericsoftware.kryo.serializers.JavaSerializer import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.config.SerializableConfiguration -import org.apache.hudi.common.model.HoodieSparkRecord +import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord} import org.apache.hudi.common.util.HoodieCommonKryoRegistrar import org.apache.hudi.config.HoodieWriteConfig import org.apache.spark.serializer.KryoRegistrator @@ -44,12 +45,15 @@ import org.apache.spark.serializer.KryoRegistrator * </ol> */ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator { + override def registerClasses(kryo: Kryo): Unit = { /////////////////////////////////////////////////////////////////////////// // NOTE: DO NOT REORDER REGISTRATIONS /////////////////////////////////////////////////////////////////////////// super[HoodieCommonKryoRegistrar].registerClasses(kryo) + kryo.register(classOf[HoodieKey], new HoodieKeySerializer) + kryo.register(classOf[HoodieWriteConfig]) kryo.register(classOf[HoodieSparkRecord]) @@ -59,6 +63,23 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist // we're relying on [[SerializableConfiguration]] wrapper to work it around kryo.register(classOf[SerializableConfiguration], new JavaSerializer()) } + + /** + * NOTE: This {@link Serializer} could deserialize instance of {@link HoodieKey} serialized + * by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer} + */ + class HoodieKeySerializer extends Serializer[HoodieKey] { + override def write(kryo: Kryo, output: Output, key: HoodieKey): Unit = { + output.writeString(key.getRecordKey) + output.writeString(key.getPartitionPath) + } + + override def read(kryo: Kryo, input: Input, klass: Class[HoodieKey]): HoodieKey = { + val recordKey = input.readString() + val partitionPath = input.readString() + new HoodieKey(recordKey, partitionPath) + } + } } object HoodieSparkKryoRegistrar { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java index 003b591c20c..296e95e8bfa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java @@ -28,6 +28,15 @@ import java.util.Objects; * we need to keep the ordering val to combine with the data records when merging, or the data loss * may occur if there are intermediate deletions for the inputs * (a new INSERT comes after a DELETE in one input batch). + * + * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING + * + * This class is serialized (using Kryo) as part of {@code HoodieDeleteBlock} to make + * sure this stays backwards-compatible we can't MAKE ANY CHANGES TO THIS CLASS (add, + * delete, reorder or change types of the fields in this class, make class final, etc) + * as this would break its compatibility with already persisted blocks. + * + * Check out HUDI-5760 for more details */ public class DeleteRecord implements Serializable { private static final long serialVersionUID = 1L; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java index 5208cece1cb..436758f96f4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java @@ -18,11 +18,6 @@ package org.apache.hudi.common.model; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoSerializable; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - import java.io.Serializable; import java.util.Objects; @@ -31,8 +26,17 @@ import java.util.Objects; * <p> * - recordKey : a recordKey that acts as primary key for a record. * - partitionPath : the partition path of a record. + * + * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING + * + * This class is serialized (using Kryo) as part of {@code HoodieDeleteBlock} to make + * sure this stays backwards-compatible we can't MAKE ANY CHANGES TO THIS CLASS (add, + * delete, reorder or change types of the fields in this class, make class final, etc) + * as this would break its compatibility with already persisted blocks. + * + * Check out HUDI-5760 for more details */ -public final class HoodieKey implements Serializable, KryoSerializable { +public class HoodieKey implements Serializable { private String recordKey; private String partitionPath; @@ -86,16 +90,4 @@ public final class HoodieKey implements Serializable, KryoSerializable { sb.append('}'); return sb.toString(); } - - @Override - public void write(Kryo kryo, Output output) { - output.writeString(recordKey); - output.writeString(partitionPath); - } - - @Override - public void read(Kryo kryo, Input input) { - this.recordKey = input.readString(); - this.partitionPath = input.readString(); - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java index a5168072d01..d920495f971 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java @@ -68,6 +68,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(baos); + // TODO(HUDI-5760) avoid using Kryo for serialization here byte[] bytesToWrite = SerializationUtils.serialize(getRecordsToDelete()); output.writeInt(version); output.writeInt(bytesToWrite.length); @@ -97,6 +98,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { } } + // TODO(HUDI-5760) avoid using Kryo for serialization here private static DeleteRecord[] deserialize(int version, byte[] data) { if (version == 1) { // legacy version