This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc46 by this 
push:
     new 9a1aa0a683 [MINOR] Make sure all `HoodieRecord`s are appropriately 
serializable by Kryo (#6977)
9a1aa0a683 is described below

commit 9a1aa0a6830b79761a9afa1c469772875d713aa1
Author: Alexey Kudinkin <alexey.kudin...@gmail.com>
AuthorDate: Wed Oct 19 21:16:13 2022 -0700

    [MINOR] Make sure all `HoodieRecord`s are appropriately serializable by 
Kryo (#6977)
    
    * Make sure `HoodieRecord`, `HoodieKey`, `HoodieRecordLocation` are all 
`KryoSerializable`
    
    * Revisited `HoodieRecord` serialization hooks to make sure they a) could 
not be overridden, b) provide for hooks to properly
    serialize record's payload;
    Implemented serialization hooks for `HoodieAvroIndexedRecord`;
    Implemented serialization hooks for `HoodieEmptyRecord`;
    
    * Revisited `HoodieRecord` serialization hooks to make sure they a) could 
not be overridden, b) provide for hooks to properly
    serialize record's payload;
    Implemented serialization hooks for `HoodieAvroIndexedRecord`;
    Implemented serialization hooks for `HoodieEmptyRecord`;
    Implemented serialization hooks for `HoodieAvroRecord`;
    
    * Revisited `HoodieSparkRecord` to transiently hold on to the schema so 
that it could project row
    
    * Implemented serialization hooks for `HoodieSparkRecord`
    
    * Added `TestHoodieSparkRecord`
    
    * Added tests for Avro-based records
    
    * Added test for `HoodieEmptyRecord`
    
    * Fixed sealing/unsealing for `HoodieRecord` in 
`HoodieBackedTableMetadataWriter`
    
    * Properly handle deflated records
    
    * Fixing `Row`s encoding
    
    * Fixed `HoodieRecord` to be properly sealed/unsealed
    
    * Fixed serialization of the `HoodieRecordGlobalLocation`
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   2 +
 .../hudi/commmon/model/HoodieSparkRecord.java      | 105 ++++++++---
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |   2 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  30 ++++
 .../apache/hudi/common/model/HoodieAvroRecord.java |  16 ++
 .../hudi/common/model/HoodieEmptyRecord.java       |  34 +++-
 .../org/apache/hudi/common/model/HoodieKey.java    |  23 ++-
 .../org/apache/hudi/common/model/HoodieRecord.java |  53 +++++-
 .../common/model/HoodieRecordGlobalLocation.java   |  23 ++-
 .../hudi/common/model/HoodieRecordLocation.java    |  19 +-
 .../sink/partitioner/BucketAssignFunction.java     |   7 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +-
 .../scala/org/apache/hudi/LogFileIterator.scala    |   2 +-
 .../SparkFullBootstrapDataProviderBase.java        |   2 +-
 .../model/TestHoodieRecordSerialization.scala      | 195 +++++++++++++++++++++
 15 files changed, 473 insertions(+), 42 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 962875fb92..eda28a5286 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -983,7 +983,9 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
         FileSlice slice = 
finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
             fileGroupCount));
+        r.unseal();
         r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+        r.seal();
         return r;
       });
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index 9cdccbe407..43000d1964 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.commmon.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.hudi.HoodieInternalRowUtils;
 import org.apache.hudi.SparkAdapterSupport$;
@@ -70,9 +74,8 @@ import static org.apache.spark.sql.types.DataTypes.StringType;
  *       need to be updated (ie serving as an overlay layer on top of 
[[UnsafeRow]])</li>
  * </ul>
  *
-
  */
-public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements 
KryoSerializable {
 
   /**
    * Record copy operation to avoid double copying. InternalRow do not need to 
copy twice.
@@ -80,41 +83,58 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
   private boolean copy;
 
   /**
-   * We should use this construction method when we read internalRow from file.
-   * The record constructed by this method must be used in iter.
+   * NOTE: {@code HoodieSparkRecord} is holding the schema only in cases when 
it would have
+   *       to execute {@link UnsafeProjection} so that the {@link InternalRow} 
it's holding to
+   *       could be projected into {@link UnsafeRow} and be efficiently 
serialized subsequently
+   *       (by Kryo)
    */
-  public HoodieSparkRecord(InternalRow data) {
+  private final transient StructType schema;
+
+  public HoodieSparkRecord(UnsafeRow data) {
+    this(data, null);
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
     super(null, data);
-    validateRow(data);
+
+    validateRow(data, schema);
     this.copy = false;
+    this.schema = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, UnsafeRow data, boolean copy) {
+    this(key, data, null, copy);
   }
 
-  public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) {
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, 
boolean copy) {
     super(key, data);
-    validateRow(data);
+
+    validateRow(data, schema);
     this.copy = copy;
+    this.schema = schema;
   }
 
-  private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation 
operation, boolean copy) {
+  private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType 
schema, HoodieOperation operation, boolean copy) {
     super(key, data, operation);
-    validateRow(data);
 
+    validateRow(data, schema);
     this.copy = copy;
+    this.schema = schema;
   }
 
   @Override
   public HoodieSparkRecord newInstance() {
-    return new HoodieSparkRecord(this.key, this.data, this.operation, 
this.copy);
+    return new HoodieSparkRecord(this.key, this.data, this.schema, 
this.operation, this.copy);
   }
 
   @Override
   public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
-    return new HoodieSparkRecord(key, this.data, op, this.copy);
+    return new HoodieSparkRecord(key, this.data, this.schema, op, this.copy);
   }
 
   @Override
   public HoodieSparkRecord newInstance(HoodieKey key) {
-    return new HoodieSparkRecord(key, this.data, this.operation, this.copy);
+    return new HoodieSparkRecord(key, this.data, this.schema, this.operation, 
this.copy);
   }
 
   @Override
@@ -155,7 +175,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
     UnsafeProjection projection =
         HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, 
targetStructType);
-    return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), 
getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), 
targetStructType, getOperation(), copy);
   }
 
   @Override
@@ -169,7 +189,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     // TODO add actual rewriting
     InternalRow finalRow = new HoodieInternalRow(metaFields, data, 
containMetaFields);
 
-    return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), finalRow, targetStructType, 
getOperation(), copy);
   }
 
   @Override
@@ -184,7 +204,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
         HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, 
newStructType, renameCols);
     HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, 
rewrittenRow, containMetaFields);
 
-    return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), finalRow, newStructType, 
getOperation(), copy);
   }
 
   @Override
@@ -199,7 +219,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
       }
     });
 
-    return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), updatableRow, structType, 
getOperation(), copy);
   }
 
   @Override
@@ -264,7 +284,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
       partition = 
data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), 
StringType).toString();
     }
     HoodieKey hoodieKey = new HoodieKey(key, partition);
-    return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy);
+    return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), 
copy);
   }
 
   @Override
@@ -299,6 +319,42 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     }
   }
 
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @Override
+  protected final void writeRecordPayload(InternalRow payload, Kryo kryo, 
Output output) {
+    // NOTE: [[payload]] could be null if record has already been deflated
+    UnsafeRow unsafeRow = convertToUnsafeRow(payload, schema);
+
+    kryo.writeObjectOrNull(output, unsafeRow, UnsafeRow.class);
+  }
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @Override
+  protected final InternalRow readRecordPayload(Kryo kryo, Input input) {
+    // NOTE: After deserialization every object is allocated on the heap, 
therefore
+    //       we annotate this object as being copied
+    this.copy = true;
+
+    return kryo.readObjectOrNull(input, UnsafeRow.class);
+  }
+
+  private static UnsafeRow convertToUnsafeRow(InternalRow payload, StructType 
schema) {
+    if (payload == null) {
+      return null;
+    } else if (payload instanceof UnsafeRow) {
+      return (UnsafeRow) payload;
+    }
+
+    UnsafeProjection unsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, schema);
+    return unsafeProjection.apply(payload);
+  }
+
   private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, 
StructType structType) {
     if (data instanceof HoodieInternalRow) {
       return (HoodieInternalRow) data;
@@ -351,14 +407,21 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
     HoodieOperation operation = withOperationField
         ? HoodieOperation.fromName(getNullableValAsString(structType, 
record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
-    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), 
record.data, operation, record.copy);
+    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), 
record.data, structType, operation, record.copy);
   }
 
-  private static void validateRow(InternalRow data) {
+  private static void validateRow(InternalRow data, StructType schema) {
     // NOTE: [[HoodieSparkRecord]] is expected to hold either
     //          - Instance of [[UnsafeRow]] or
     //          - Instance of [[HoodieInternalRow]] or
     //          - Instance of [[ColumnarBatchRow]]
-    ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof 
HoodieInternalRow || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+    //
+    //       In case provided row is anything but [[UnsafeRow]], it's expected 
that the
+    //       corresponding schema has to be provided as well so that it could 
be properly
+    //       serialized (in case it would need to be)
+    boolean isValid = data instanceof UnsafeRow
+        || schema != null && (data instanceof HoodieInternalRow || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+
+    ValidationUtils.checkState(isValid);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 8673d2f5ba..92fa653061 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -101,7 +101,7 @@ public class RDDSpatialCurveSortPartitioner<T>
             String key = 
internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
             String partition = 
internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
             HoodieKey hoodieKey = new HoodieKey(key, partition);
-            return (HoodieRecord) new HoodieSparkRecord(hoodieKey, 
internalRow, false);
+            return (HoodieRecord) new HoodieSparkRecord(hoodieKey, 
internalRow, structType, false);
           });
     } else {
       throw new UnsupportedOperationException(recordType.name());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 479c8eb9d6..a9e5091429 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.common.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
@@ -200,4 +204,30 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
   public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, 
Properties props) {
     return Option.of(this);
   }
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected final void writeRecordPayload(IndexedRecord payload, Kryo kryo, 
Output output) {
+    // NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to 
serialize Avro
+    Serializer<GenericRecord> avroSerializer = 
kryo.getSerializer(GenericRecord.class);
+
+    kryo.writeObjectOrNull(output, payload, avroSerializer);
+  }
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) {
+    // NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to 
serialize Avro
+    Serializer<GenericRecord> avroSerializer = 
kryo.getSerializer(GenericRecord.class);
+
+    return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 5cbadece6b..de653054cd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -19,6 +19,9 @@
 
 package org.apache.hudi.common.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
@@ -193,4 +196,17 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
       return Option.empty();
     }
   }
+
+  @Override
+  protected final void writeRecordPayload(T payload, Kryo kryo, Output output) 
{
+    // NOTE: Since [[orderingVal]] is polymorphic we have to write out its 
class
+    //       to be able to properly deserialize it
+    kryo.writeClassAndObject(output, payload);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected final T readRecordPayload(Kryo kryo, Input input) {
+    return (T) kryo.readClassAndObject(input);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index eca1ad9a3b..8e80ff7c84 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -18,20 +18,22 @@
 
 package org.apache.hudi.common.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
-import org.apache.avro.Schema;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
 public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
 
-  private final HoodieRecordType type;
-  private final Comparable<?> orderingVal;
+  private HoodieRecordType type;
+  private Comparable<?> orderingVal;
 
   public HoodieEmptyRecord(HoodieKey key, HoodieRecordType type) {
     super(key, null);
@@ -152,4 +154,28 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   public Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @Override
+  protected final void writeRecordPayload(T payload, Kryo kryo, Output output) 
{
+    kryo.writeObject(output, type);
+    // NOTE: Since [[orderingVal]] is polymorphic we have to write out its 
class
+    //       to be able to properly deserialize it
+    kryo.writeClassAndObject(output, orderingVal);
+  }
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @Override
+  protected final T readRecordPayload(Kryo kryo, Input input) {
+    this.type = kryo.readObject(input, HoodieRecordType.class);
+    this.orderingVal = (Comparable<?>) kryo.readClassAndObject(input);
+    // NOTE: [[EmptyRecord]]'s payload is always null
+    return null;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
index 9030204099..5208cece1c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
@@ -18,6 +18,11 @@
 
 package org.apache.hudi.common.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -27,13 +32,13 @@ import java.util.Objects;
  * - recordKey : a recordKey that acts as primary key for a record.
  * - partitionPath : the partition path of a record.
  */
-public class HoodieKey implements Serializable {
+public final class HoodieKey implements Serializable, KryoSerializable {
 
   private String recordKey;
   private String partitionPath;
 
-  public HoodieKey() {
-  }
+  // Required for serializer
+  public HoodieKey() {}
 
   public HoodieKey(String recordKey, String partitionPath) {
     this.recordKey = recordKey;
@@ -81,4 +86,16 @@ public class HoodieKey implements Serializable {
     sb.append('}');
     return sb.toString();
   }
+
+  @Override
+  public void write(Kryo kryo, Output output) {
+    output.writeString(recordKey);
+    output.writeString(partitionPath);
+  }
+
+  @Override
+  public void read(Kryo kryo, Input input) {
+    this.recordKey = input.readString();
+    this.partitionPath = input.readString();
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 363092409d..778186d4bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.common.model;
 
-import java.util.Collections;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -29,18 +31,19 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
  * A Single Record managed by Hoodie.
  */
-public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterface, Serializable {
+public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterface, KryoSerializable, Serializable {
 
   public static final String COMMIT_TIME_METADATA_FIELD = 
HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName();
   public static final String COMMIT_SEQNO_METADATA_FIELD = 
HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName();
@@ -158,8 +161,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     this.sealed = record.sealed;
   }
 
-  public HoodieRecord() {
-  }
+  public HoodieRecord() {}
 
   public abstract HoodieRecord<T> newInstance();
 
@@ -282,6 +284,45 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     }
   }
 
+  protected abstract void writeRecordPayload(T payload, Kryo kryo, Output 
output);
+
+  protected abstract T readRecordPayload(Kryo kryo, Input input);
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @Override
+  public final void write(Kryo kryo, Output output) {
+    kryo.writeObjectOrNull(output, key, HoodieKey.class);
+    kryo.writeObjectOrNull(output, operation, HoodieOperation.class);
+    // NOTE: We have to write actual class along with the object here,
+    //       since [[HoodieRecordLocation]] has inheritors
+    kryo.writeClassAndObject(output, currentLocation);
+    kryo.writeClassAndObject(output, newLocation);
+    // NOTE: Writing out actual record payload is relegated to the actual
+    //       implementation
+    writeRecordPayload(data, kryo, output);
+  }
+
+  /**
+   * NOTE: This method is declared final to make sure there's no polymorphism 
and therefore
+   *       JIT compiler could perform more aggressive optimizations
+   */
+  @Override
+  public final void read(Kryo kryo, Input input) {
+    this.key = kryo.readObjectOrNull(input, HoodieKey.class);
+    this.operation = kryo.readObjectOrNull(input, HoodieOperation.class);
+    this.currentLocation = (HoodieRecordLocation) 
kryo.readClassAndObject(input);
+    this.newLocation = (HoodieRecordLocation) kryo.readClassAndObject(input);
+    // NOTE: Reading out actual record payload is relegated to the actual
+    //       implementation
+    this.data = readRecordPayload(kryo, input);
+
+    // NOTE: We're always seal object after deserialization
+    this.sealed = true;
+  }
+
   /**
    * Get column in record to support RDDCustomColumnsSortPartitioner
    * @return
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
index f469a1ab45..8c021d902a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
@@ -18,18 +18,21 @@
 
 package org.apache.hudi.common.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 import java.util.Objects;
 
 /**
  * Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but 
with partition path.
  */
-public class HoodieRecordGlobalLocation extends HoodieRecordLocation {
+public final class HoodieRecordGlobalLocation extends HoodieRecordLocation {
   private static final long serialVersionUID = 1L;
 
   private String partitionPath;
 
-  public HoodieRecordGlobalLocation() {
-  }
+  public HoodieRecordGlobalLocation() {}
 
   public HoodieRecordGlobalLocation(String partitionPath, String instantTime, 
String fileId) {
     super(instantTime, fileId);
@@ -93,5 +96,19 @@ public class HoodieRecordGlobalLocation extends 
HoodieRecordLocation {
   public HoodieRecordGlobalLocation copy(String partitionPath) {
     return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
   }
+
+  @Override
+  public final void write(Kryo kryo, Output output) {
+    super.write(kryo, output);
+
+    kryo.writeObjectOrNull(output, partitionPath, String.class);
+  }
+
+  @Override
+  public void read(Kryo kryo, Input input) {
+    super.read(kryo, input);
+
+    this.partitionPath = kryo.readObject(input, String.class);
+  }
 }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
index 2b1feab39b..8b1dd2b378 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
@@ -18,13 +18,18 @@
 
 package org.apache.hudi.common.model;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 import java.io.Serializable;
 import java.util.Objects;
 
 /**
  * Location of a HoodieRecord within the partition it belongs to. Ultimately, 
this points to an actual file on disk
  */
-public class HoodieRecordLocation implements Serializable {
+public class HoodieRecordLocation implements Serializable, KryoSerializable {
 
   protected String instantTime;
   protected String fileId;
@@ -78,4 +83,16 @@ public class HoodieRecordLocation implements Serializable {
   public void setFileId(String fileId) {
     this.fileId = fileId;
   }
+
+  @Override
+  public void write(Kryo kryo, Output output) {
+    output.writeString(instantTime);
+    output.writeString(fileId);
+  }
+
+  @Override
+  public void read(Kryo kryo, Input input) {
+    this.instantTime = input.readString();
+    this.fileId = input.readString();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 89f89cf5c0..550016a236 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -184,8 +184,11 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
           // then update the index state using location with new partition 
path.
           HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new 
HoodieKey(recordKey, oldLoc.getPartitionPath()),
               payloadCreation.createDeletePayload((BaseAvroPayload) 
record.getData()));
+
+          deleteRecord.unseal();
           deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
           deleteRecord.seal();
+
           out.collect((O) deleteRecord);
         }
         location = getNewRecordLocation(partitionPath);
@@ -200,7 +203,11 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     if (isChangingRecords) {
       updateIndexState(partitionPath, location);
     }
+
+    record.unseal();
     record.setCurrentLocation(location);
+    record.seal();
+
     out.collect((O) record);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 99313884c3..2ae18bd860 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -877,7 +877,7 @@ object HoodieSparkSqlWriter {
             val partitionPath = 
sparkKeyGenerator.getPartitionPath(internalRow, structType)
             val key = new HoodieKey(recordKey.toString, partitionPath.toString)
 
-            new HoodieSparkRecord(key, processedRow, false)
+            new HoodieSparkRecord(key, processedRow, structType, false)
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index 256bc14e82..6ee5856d6a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -238,7 +238,7 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
     //       on the record from the Delta Log
     recordMerger.getRecordType match {
       case HoodieRecordType.SPARK =>
-        val curRecord = new HoodieSparkRecord(curRow)
+        val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
         val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, 
newRecord, logFileReaderAvroSchema, payloadProps)
         toScalaOption(result)
           .map(r => {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index dc408ee919..e666c89306 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -96,7 +96,7 @@ public abstract class SparkFullBootstrapDataProviderBase 
extends FullRecordBoots
           String recordKey = sparkKeyGenerator.getRecordKey(internalRow, 
structType).toString();
           String partitionPath = 
sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
           HoodieKey key = new HoodieKey(recordKey, partitionPath);
-          return new HoodieSparkRecord(key, internalRow, false);
+          return new HoodieSparkRecord(key, internalRow, structType, false);
         });
       } else {
         throw new UnsupportedOperationException(recordType.name());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
new file mode 100644
index 0000000000..8329fda093
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, 
createInternalRowToAvroConverter}
+import org.apache.hudi.{HoodieInternalRowUtils, SparkAdapterSupport}
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import 
org.apache.hudi.common.model.TestHoodieRecordSerialization.{OverwriteWithLatestAvroPayloadWithEquality,
 cloneUsingKryo, convertToAvroRecord, toUnsafeRow}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.objects.SerializerSupport
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
UnsafeRow}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.addMetaFields
+import org.apache.spark.sql.types.{Decimal, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate}
+import java.util.Objects
+
+class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness {
+
+  private val rowSchema = StructType.fromDDL("a INT, b STRING, c DATE, d 
TIMESTAMP, e STRUCT<a: DECIMAL(3, 2)>")
+
+  @Test
+  def testSparkRecord(): Unit = {
+    def routine(row: InternalRow, schema: StructType, serializedSize: Int): 
Unit = {
+      val record = row match {
+        case ur: UnsafeRow => new HoodieSparkRecord(ur)
+        case _ => new HoodieSparkRecord(row, schema)
+      }
+
+      // Step 1: Serialize/de- original [[HoodieSparkRecord]]
+      val (cloned, originalBytes) = cloneUsingKryo(record)
+
+      assertEquals(serializedSize, originalBytes.length)
+      // NOTE: That in case when original row isn't an instance of 
[[UnsafeRow]]
+      //       it would be
+      //         - Projected into [[UnsafeRow]] (prior to serialization by 
Kryo)
+      //         - Re-constructed as [[UnsafeRow]]
+      row match {
+        case _: UnsafeRow => assertEquals(record, cloned)
+        case _ =>
+          val convertedRecord = new HoodieSparkRecord(toUnsafeRow(row, schema))
+          assertEquals(convertedRecord, cloned)
+      }
+
+      // Step 2: Serialize the already cloned record, and assert that ser/de 
loop is lossless
+      val (_, clonedBytes) = cloneUsingKryo(cloned)
+      assertEquals(ByteBuffer.wrap(originalBytes), 
ByteBuffer.wrap(clonedBytes))
+    }
+
+    val row = Row(1, "test", Date.valueOf(LocalDate.of(2022, 10, 1)),
+      Timestamp.from(Instant.parse("2022-10-01T23:59:59.00Z")), 
Row(Decimal.apply(123, 3, 2)))
+
+    val unsafeRow: UnsafeRow = toUnsafeRow(row, rowSchema)
+    val hoodieInternalRow = new HoodieInternalRow(new Array[UTF8String](5), 
unsafeRow, false)
+
+    Seq(
+      (unsafeRow, rowSchema, 135),
+      (hoodieInternalRow, addMetaFields(rowSchema), 175)
+    ) foreach { case (row, schema, expectedSize) => routine(row, schema, 
expectedSize) }
+  }
+
+  @Test
+  def testAvroRecords(): Unit = {
+    def routine(record: HoodieRecord[_], expectedSize: Int): Unit = {
+      // Step 1: Serialize/de- original [[HoodieRecord]]
+      val (cloned, originalBytes) = cloneUsingKryo(record)
+
+      assertEquals(expectedSize, originalBytes.length)
+      assertEquals(record, cloned)
+
+      // Step 2: Serialize the already cloned record, and assert that ser/de 
loop is lossless
+      val (_, clonedBytes) = cloneUsingKryo(cloned)
+      assertEquals(ByteBuffer.wrap(originalBytes), 
ByteBuffer.wrap(clonedBytes))
+    }
+
+    val row = new GenericRowWithSchema(Array(1, "test", 
Date.valueOf(LocalDate.of(2022, 10, 1)),
+      Timestamp.from(Instant.parse("2022-10-01T23:59:59.00Z")), 
Row(Decimal.apply(123, 3, 2))), rowSchema)
+    val avroRecord = convertToAvroRecord(row)
+
+    val key = new HoodieKey("rec-key", "part-path")
+
+    val legacyRecord = toLegacyAvroRecord(avroRecord, key)
+    val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
+
+    Seq(
+      (legacyRecord, 573),
+      (avroIndexedRecord, 442)
+    ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
+  }
+
+  @Test
+  def testEmptyRecord(): Unit = {
+    def routine(record: HoodieRecord[_], expectedSize: Int): Unit = {
+      // Step 1: Serialize/de- original [[HoodieRecord]]
+      val (cloned, originalBytes) = cloneUsingKryo(record)
+
+      assertEquals(expectedSize, originalBytes.length)
+      assertEquals(record, cloned)
+
+      // Step 2: Serialize the already cloned record, and assert that ser/de 
loop is lossless
+      val (_, clonedBytes) = cloneUsingKryo(cloned)
+      assertEquals(ByteBuffer.wrap(originalBytes), 
ByteBuffer.wrap(clonedBytes))
+    }
+
+    val key = new HoodieKey("rec-key", "part-path")
+
+    Seq(
+      (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1, 
HoodieRecordType.AVRO), 74),
+      (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 2, 
HoodieRecordType.SPARK), 74)
+    ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
+  }
+
+
+  private def toLegacyAvroRecord(avroRecord: GenericRecord, key: HoodieKey): 
HoodieAvroRecord[OverwriteWithLatestAvroPayload] = {
+    val avroRecordPayload = new 
OverwriteWithLatestAvroPayloadWithEquality(avroRecord, 0)
+    val legacyRecord = new 
HoodieAvroRecord[OverwriteWithLatestAvroPayload](key, avroRecordPayload)
+
+    legacyRecord
+  }
+}
+
+object TestHoodieRecordSerialization {
+
+  private def cloneUsingKryo[T](r: HoodieRecord[T]): (HoodieRecord[T], 
Array[Byte]) = {
+    val serializer = SerializerSupport.newSerializer(true)
+
+    val buf = serializer.serialize(r)
+    val cloned: HoodieRecord[T] = serializer.deserialize(buf)
+
+    val bytes = new Array[Byte](buf.remaining())
+    buf.get(bytes)
+
+    (cloned, bytes)
+  }
+
+  private def toUnsafeRow(row: InternalRow, schema: StructType): UnsafeRow = {
+    val project = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, 
schema)
+    project(row)
+  }
+
+  private def toUnsafeRow(row: Row, schema: StructType): UnsafeRow = {
+    val encoder = SparkAdapterSupport.sparkAdapter.createSparkRowSerDe(schema)
+    val internalRow = encoder.serializeRow(row)
+    internalRow.asInstanceOf[UnsafeRow]
+  }
+
+  private def convertToAvroRecord(row: Row): GenericRecord = {
+    val schema = convertStructTypeToAvroSchema(row.schema, "testRecord", 
"testNamespace")
+
+    createInternalRowToAvroConverter(row.schema, schema, nullable = false)
+      .apply(toUnsafeRow(row, row.schema))
+  }
+
+  class OverwriteWithLatestAvroPayloadWithEquality(avroRecord: GenericRecord, 
_orderingVal: Comparable[_])
+    extends OverwriteWithLatestAvroPayload(avroRecord, _orderingVal) {
+    override def equals(obj: Any): Boolean =
+      obj match {
+        case p: OverwriteWithLatestAvroPayloadWithEquality =>
+          Objects.equals(ByteBuffer.wrap(this.recordBytes), 
ByteBuffer.wrap(p.recordBytes)) &&
+            Objects.equals(this.orderingVal, p.orderingVal)
+        case _ =>
+          false
+      }
+
+    override def hashCode(): Int = Objects.hash(avroRecord, 
_orderingVal.asInstanceOf[AnyRef])
+  }
+
+}


Reply via email to