This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch release-feature-rfc46 in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this push: new 9a1aa0a683 [MINOR] Make sure all `HoodieRecord`s are appropriately serializable by Kryo (#6977) 9a1aa0a683 is described below commit 9a1aa0a6830b79761a9afa1c469772875d713aa1 Author: Alexey Kudinkin <alexey.kudin...@gmail.com> AuthorDate: Wed Oct 19 21:16:13 2022 -0700 [MINOR] Make sure all `HoodieRecord`s are appropriately serializable by Kryo (#6977) * Make sure `HoodieRecord`, `HoodieKey`, `HoodieRecordLocation` are all `KryoSerializable` * Revisited `HoodieRecord` serialization hooks to make sure they a) could not be overridden, b) provide for hooks to properly serialize record's payload; Implemented serialization hooks for `HoodieAvroIndexedRecord`; Implemented serialization hooks for `HoodieEmptyRecord`; * Revisited `HoodieRecord` serialization hooks to make sure they a) could not be overridden, b) provide for hooks to properly serialize record's payload; Implemented serialization hooks for `HoodieAvroIndexedRecord`; Implemented serialization hooks for `HoodieEmptyRecord`; Implemented serialization hooks for `HoodieAvroRecord`; * Revisited `HoodieSparkRecord` to transiently hold on to the schema so that it could project row * Implemented serialization hooks for `HoodieSparkRecord` * Added `TestHoodieSparkRecord` * Added tests for Avro-based records * Added test for `HoodieEmptyRecord` * Fixed sealing/unsealing for `HoodieRecord` in `HoodieBackedTableMetadataWriter` * Properly handle deflated records * Fixing `Row`s encoding * Fixed `HoodieRecord` to be properly sealed/unsealed * Fixed serialization of the `HoodieRecordGlobalLocation` --- .../metadata/HoodieBackedTableMetadataWriter.java | 2 + .../hudi/commmon/model/HoodieSparkRecord.java | 105 ++++++++--- .../bulkinsert/RDDSpatialCurveSortPartitioner.java | 2 +- .../hudi/common/model/HoodieAvroIndexedRecord.java | 30 ++++ .../apache/hudi/common/model/HoodieAvroRecord.java | 16 ++ .../hudi/common/model/HoodieEmptyRecord.java | 34 +++- .../org/apache/hudi/common/model/HoodieKey.java | 23 ++- .../org/apache/hudi/common/model/HoodieRecord.java | 53 +++++- .../common/model/HoodieRecordGlobalLocation.java | 23 ++- .../hudi/common/model/HoodieRecordLocation.java | 19 +- .../sink/partitioner/BucketAssignFunction.java | 7 + .../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +- .../scala/org/apache/hudi/LogFileIterator.scala | 2 +- .../SparkFullBootstrapDataProviderBase.java | 2 +- .../model/TestHoodieRecordSerialization.scala | 195 +++++++++++++++++++++ 15 files changed, 473 insertions(+), 42 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 962875fb92..eda28a5286 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -983,7 +983,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> { FileSlice slice = finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), fileGroupCount)); + r.unseal(); r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId())); + r.seal(); return r; }); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java index 9cdccbe407..43000d1964 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java @@ -18,6 +18,10 @@ package org.apache.hudi.commmon.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.avro.Schema; import org.apache.hudi.HoodieInternalRowUtils; import org.apache.hudi.SparkAdapterSupport$; @@ -70,9 +74,8 @@ import static org.apache.spark.sql.types.DataTypes.StringType; * need to be updated (ie serving as an overlay layer on top of [[UnsafeRow]])</li> * </ul> * - */ -public class HoodieSparkRecord extends HoodieRecord<InternalRow> { +public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements KryoSerializable { /** * Record copy operation to avoid double copying. InternalRow do not need to copy twice. @@ -80,41 +83,58 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { private boolean copy; /** - * We should use this construction method when we read internalRow from file. - * The record constructed by this method must be used in iter. + * NOTE: {@code HoodieSparkRecord} is holding the schema only in cases when it would have + * to execute {@link UnsafeProjection} so that the {@link InternalRow} it's holding to + * could be projected into {@link UnsafeRow} and be efficiently serialized subsequently + * (by Kryo) */ - public HoodieSparkRecord(InternalRow data) { + private final transient StructType schema; + + public HoodieSparkRecord(UnsafeRow data) { + this(data, null); + } + + public HoodieSparkRecord(InternalRow data, StructType schema) { super(null, data); - validateRow(data); + + validateRow(data, schema); this.copy = false; + this.schema = schema; + } + + public HoodieSparkRecord(HoodieKey key, UnsafeRow data, boolean copy) { + this(key, data, null, copy); } - public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) { + public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, boolean copy) { super(key, data); - validateRow(data); + + validateRow(data, schema); this.copy = copy; + this.schema = schema; } - private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, boolean copy) { + private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) { super(key, data, operation); - validateRow(data); + validateRow(data, schema); this.copy = copy; + this.schema = schema; } @Override public HoodieSparkRecord newInstance() { - return new HoodieSparkRecord(this.key, this.data, this.operation, this.copy); + return new HoodieSparkRecord(this.key, this.data, this.schema, this.operation, this.copy); } @Override public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) { - return new HoodieSparkRecord(key, this.data, op, this.copy); + return new HoodieSparkRecord(key, this.data, this.schema, op, this.copy); } @Override public HoodieSparkRecord newInstance(HoodieKey key) { - return new HoodieSparkRecord(key, this.data, this.operation, this.copy); + return new HoodieSparkRecord(key, this.data, this.schema, this.operation, this.copy); } @Override @@ -155,7 +175,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData()); UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType); - return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), getOperation(), copy); + return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), copy); } @Override @@ -169,7 +189,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { // TODO add actual rewriting InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields); - return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy); + return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), copy); } @Override @@ -184,7 +204,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols); HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields); - return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy); + return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), copy); } @Override @@ -199,7 +219,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { } }); - return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy); + return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), copy); } @Override @@ -264,7 +284,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString(); } HoodieKey hoodieKey = new HoodieKey(key, partition); - return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy); + return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy); } @Override @@ -299,6 +319,42 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { } } + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @Override + protected final void writeRecordPayload(InternalRow payload, Kryo kryo, Output output) { + // NOTE: [[payload]] could be null if record has already been deflated + UnsafeRow unsafeRow = convertToUnsafeRow(payload, schema); + + kryo.writeObjectOrNull(output, unsafeRow, UnsafeRow.class); + } + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @Override + protected final InternalRow readRecordPayload(Kryo kryo, Input input) { + // NOTE: After deserialization every object is allocated on the heap, therefore + // we annotate this object as being copied + this.copy = true; + + return kryo.readObjectOrNull(input, UnsafeRow.class); + } + + private static UnsafeRow convertToUnsafeRow(InternalRow payload, StructType schema) { + if (payload == null) { + return null; + } else if (payload instanceof UnsafeRow) { + return (UnsafeRow) payload; + } + + UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, schema); + return unsafeProjection.apply(payload); + } + private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, StructType structType) { if (data instanceof HoodieInternalRow) { return (HoodieInternalRow) data; @@ -351,14 +407,21 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { HoodieOperation operation = withOperationField ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null; - return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, operation, record.copy); + return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy); } - private static void validateRow(InternalRow data) { + private static void validateRow(InternalRow data, StructType schema) { // NOTE: [[HoodieSparkRecord]] is expected to hold either // - Instance of [[UnsafeRow]] or // - Instance of [[HoodieInternalRow]] or // - Instance of [[ColumnarBatchRow]] - ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data)); + // + // In case provided row is anything but [[UnsafeRow]], it's expected that the + // corresponding schema has to be provided as well so that it could be properly + // serialized (in case it would need to be) + boolean isValid = data instanceof UnsafeRow + || schema != null && (data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data)); + + ValidationUtils.checkState(isValid); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 8673d2f5ba..92fa653061 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -101,7 +101,7 @@ public class RDDSpatialCurveSortPartitioner<T> String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal()); String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal()); HoodieKey hoodieKey = new HoodieKey(key, partition); - return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, false); + return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, structType, false); }); } else { throw new UnsupportedOperationException(recordType.name()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index 479c8eb9d6..a9e5091429 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -200,4 +204,30 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> { public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) { return Option.of(this); } + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @SuppressWarnings("unchecked") + @Override + protected final void writeRecordPayload(IndexedRecord payload, Kryo kryo, Output output) { + // NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to serialize Avro + Serializer<GenericRecord> avroSerializer = kryo.getSerializer(GenericRecord.class); + + kryo.writeObjectOrNull(output, payload, avroSerializer); + } + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @SuppressWarnings("unchecked") + @Override + protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) { + // NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to serialize Avro + Serializer<GenericRecord> avroSerializer = kryo.getSerializer(GenericRecord.class); + + return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 5cbadece6b..de653054cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -19,6 +19,9 @@ package org.apache.hudi.common.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -193,4 +196,17 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor return Option.empty(); } } + + @Override + protected final void writeRecordPayload(T payload, Kryo kryo, Output output) { + // NOTE: Since [[orderingVal]] is polymorphic we have to write out its class + // to be able to properly deserialize it + kryo.writeClassAndObject(output, payload); + } + + @SuppressWarnings("unchecked") + @Override + protected final T readRecordPayload(Kryo kryo, Input input) { + return (T) kryo.readClassAndObject(input); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java index eca1ad9a3b..8e80ff7c84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java @@ -18,20 +18,22 @@ package org.apache.hudi.common.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.avro.Schema; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.keygen.BaseKeyGenerator; -import org.apache.avro.Schema; - import java.io.IOException; import java.util.Map; import java.util.Properties; public class HoodieEmptyRecord<T> extends HoodieRecord<T> { - private final HoodieRecordType type; - private final Comparable<?> orderingVal; + private HoodieRecordType type; + private Comparable<?> orderingVal; public HoodieEmptyRecord(HoodieKey key, HoodieRecordType type) { super(key, null); @@ -152,4 +154,28 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> { public Option<Map<String, String>> getMetadata() { return Option.empty(); } + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @Override + protected final void writeRecordPayload(T payload, Kryo kryo, Output output) { + kryo.writeObject(output, type); + // NOTE: Since [[orderingVal]] is polymorphic we have to write out its class + // to be able to properly deserialize it + kryo.writeClassAndObject(output, orderingVal); + } + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @Override + protected final T readRecordPayload(Kryo kryo, Input input) { + this.type = kryo.readObject(input, HoodieRecordType.class); + this.orderingVal = (Comparable<?>) kryo.readClassAndObject(input); + // NOTE: [[EmptyRecord]]'s payload is always null + return null; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java index 9030204099..5208cece1c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java @@ -18,6 +18,11 @@ package org.apache.hudi.common.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + import java.io.Serializable; import java.util.Objects; @@ -27,13 +32,13 @@ import java.util.Objects; * - recordKey : a recordKey that acts as primary key for a record. * - partitionPath : the partition path of a record. */ -public class HoodieKey implements Serializable { +public final class HoodieKey implements Serializable, KryoSerializable { private String recordKey; private String partitionPath; - public HoodieKey() { - } + // Required for serializer + public HoodieKey() {} public HoodieKey(String recordKey, String partitionPath) { this.recordKey = recordKey; @@ -81,4 +86,16 @@ public class HoodieKey implements Serializable { sb.append('}'); return sb.toString(); } + + @Override + public void write(Kryo kryo, Output output) { + output.writeString(recordKey); + output.writeString(partitionPath); + } + + @Override + public void read(Kryo kryo, Input input) { + this.recordKey = input.readString(); + this.partitionPath = input.readString(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 363092409d..778186d4bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -18,10 +18,12 @@ package org.apache.hudi.common.model; -import java.util.Collections; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; - import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -29,18 +31,19 @@ import org.apache.hudi.keygen.BaseKeyGenerator; import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * A Single Record managed by Hoodie. */ -public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterface, Serializable { +public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterface, KryoSerializable, Serializable { public static final String COMMIT_TIME_METADATA_FIELD = HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName(); public static final String COMMIT_SEQNO_METADATA_FIELD = HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName(); @@ -158,8 +161,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf this.sealed = record.sealed; } - public HoodieRecord() { - } + public HoodieRecord() {} public abstract HoodieRecord<T> newInstance(); @@ -282,6 +284,45 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf } } + protected abstract void writeRecordPayload(T payload, Kryo kryo, Output output); + + protected abstract T readRecordPayload(Kryo kryo, Input input); + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @Override + public final void write(Kryo kryo, Output output) { + kryo.writeObjectOrNull(output, key, HoodieKey.class); + kryo.writeObjectOrNull(output, operation, HoodieOperation.class); + // NOTE: We have to write actual class along with the object here, + // since [[HoodieRecordLocation]] has inheritors + kryo.writeClassAndObject(output, currentLocation); + kryo.writeClassAndObject(output, newLocation); + // NOTE: Writing out actual record payload is relegated to the actual + // implementation + writeRecordPayload(data, kryo, output); + } + + /** + * NOTE: This method is declared final to make sure there's no polymorphism and therefore + * JIT compiler could perform more aggressive optimizations + */ + @Override + public final void read(Kryo kryo, Input input) { + this.key = kryo.readObjectOrNull(input, HoodieKey.class); + this.operation = kryo.readObjectOrNull(input, HoodieOperation.class); + this.currentLocation = (HoodieRecordLocation) kryo.readClassAndObject(input); + this.newLocation = (HoodieRecordLocation) kryo.readClassAndObject(input); + // NOTE: Reading out actual record payload is relegated to the actual + // implementation + this.data = readRecordPayload(kryo, input); + + // NOTE: We're always seal object after deserialization + this.sealed = true; + } + /** * Get column in record to support RDDCustomColumnsSortPartitioner * @return diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java index f469a1ab45..8c021d902a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java @@ -18,18 +18,21 @@ package org.apache.hudi.common.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + import java.util.Objects; /** * Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but with partition path. */ -public class HoodieRecordGlobalLocation extends HoodieRecordLocation { +public final class HoodieRecordGlobalLocation extends HoodieRecordLocation { private static final long serialVersionUID = 1L; private String partitionPath; - public HoodieRecordGlobalLocation() { - } + public HoodieRecordGlobalLocation() {} public HoodieRecordGlobalLocation(String partitionPath, String instantTime, String fileId) { super(instantTime, fileId); @@ -93,5 +96,19 @@ public class HoodieRecordGlobalLocation extends HoodieRecordLocation { public HoodieRecordGlobalLocation copy(String partitionPath) { return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId); } + + @Override + public final void write(Kryo kryo, Output output) { + super.write(kryo, output); + + kryo.writeObjectOrNull(output, partitionPath, String.class); + } + + @Override + public void read(Kryo kryo, Input input) { + super.read(kryo, input); + + this.partitionPath = kryo.readObject(input, String.class); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java index 2b1feab39b..8b1dd2b378 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java @@ -18,13 +18,18 @@ package org.apache.hudi.common.model; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + import java.io.Serializable; import java.util.Objects; /** * Location of a HoodieRecord within the partition it belongs to. Ultimately, this points to an actual file on disk */ -public class HoodieRecordLocation implements Serializable { +public class HoodieRecordLocation implements Serializable, KryoSerializable { protected String instantTime; protected String fileId; @@ -78,4 +83,16 @@ public class HoodieRecordLocation implements Serializable { public void setFileId(String fileId) { this.fileId = fileId; } + + @Override + public void write(Kryo kryo, Output output) { + output.writeString(instantTime); + output.writeString(fileId); + } + + @Override + public void read(Kryo kryo, Input input) { + this.instantTime = input.readString(); + this.fileId = input.readString(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 89f89cf5c0..550016a236 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -184,8 +184,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> // then update the index state using location with new partition path. HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()), payloadCreation.createDeletePayload((BaseAvroPayload) record.getData())); + + deleteRecord.unseal(); deleteRecord.setCurrentLocation(oldLoc.toLocal("U")); deleteRecord.seal(); + out.collect((O) deleteRecord); } location = getNewRecordLocation(partitionPath); @@ -200,7 +203,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> if (isChangingRecords) { updateIndexState(partitionPath, location); } + + record.unseal(); record.setCurrentLocation(location); + record.seal(); + out.collect((O) record); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 99313884c3..2ae18bd860 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -877,7 +877,7 @@ object HoodieSparkSqlWriter { val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType) val key = new HoodieKey(recordKey.toString, partitionPath.toString) - new HoodieSparkRecord(key, processedRow, false) + new HoodieSparkRecord(key, processedRow, structType, false) } }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala index 256bc14e82..6ee5856d6a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala @@ -238,7 +238,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, // on the record from the Delta Log recordMerger.getRecordType match { case HoodieRecordType.SPARK => - val curRecord = new HoodieSparkRecord(curRow) + val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema) val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps) toScalaOption(result) .map(r => { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index dc408ee919..e666c89306 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -96,7 +96,7 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString(); String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString(); HoodieKey key = new HoodieKey(recordKey, partitionPath); - return new HoodieSparkRecord(key, internalRow, false); + return new HoodieSparkRecord(key, internalRow, structType, false); }); } else { throw new UnsupportedOperationException(recordType.name()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala new file mode 100644 index 0000000000..8329fda093 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model + +import org.apache.avro.generic.GenericRecord +import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, createInternalRowToAvroConverter} +import org.apache.hudi.{HoodieInternalRowUtils, SparkAdapterSupport} +import org.apache.hudi.client.model.HoodieInternalRow +import org.apache.hudi.commmon.model.HoodieSparkRecord +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.model.TestHoodieRecordSerialization.{OverwriteWithLatestAvroPayloadWithEquality, cloneUsingKryo, convertToAvroRecord, toUnsafeRow} +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.objects.SerializerSupport +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, UnsafeRow} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.addMetaFields +import org.apache.spark.sql.types.{Decimal, StructType} +import org.apache.spark.unsafe.types.UTF8String +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.ByteBuffer +import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDate} +import java.util.Objects + +class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness { + + private val rowSchema = StructType.fromDDL("a INT, b STRING, c DATE, d TIMESTAMP, e STRUCT<a: DECIMAL(3, 2)>") + + @Test + def testSparkRecord(): Unit = { + def routine(row: InternalRow, schema: StructType, serializedSize: Int): Unit = { + val record = row match { + case ur: UnsafeRow => new HoodieSparkRecord(ur) + case _ => new HoodieSparkRecord(row, schema) + } + + // Step 1: Serialize/de- original [[HoodieSparkRecord]] + val (cloned, originalBytes) = cloneUsingKryo(record) + + assertEquals(serializedSize, originalBytes.length) + // NOTE: That in case when original row isn't an instance of [[UnsafeRow]] + // it would be + // - Projected into [[UnsafeRow]] (prior to serialization by Kryo) + // - Re-constructed as [[UnsafeRow]] + row match { + case _: UnsafeRow => assertEquals(record, cloned) + case _ => + val convertedRecord = new HoodieSparkRecord(toUnsafeRow(row, schema)) + assertEquals(convertedRecord, cloned) + } + + // Step 2: Serialize the already cloned record, and assert that ser/de loop is lossless + val (_, clonedBytes) = cloneUsingKryo(cloned) + assertEquals(ByteBuffer.wrap(originalBytes), ByteBuffer.wrap(clonedBytes)) + } + + val row = Row(1, "test", Date.valueOf(LocalDate.of(2022, 10, 1)), + Timestamp.from(Instant.parse("2022-10-01T23:59:59.00Z")), Row(Decimal.apply(123, 3, 2))) + + val unsafeRow: UnsafeRow = toUnsafeRow(row, rowSchema) + val hoodieInternalRow = new HoodieInternalRow(new Array[UTF8String](5), unsafeRow, false) + + Seq( + (unsafeRow, rowSchema, 135), + (hoodieInternalRow, addMetaFields(rowSchema), 175) + ) foreach { case (row, schema, expectedSize) => routine(row, schema, expectedSize) } + } + + @Test + def testAvroRecords(): Unit = { + def routine(record: HoodieRecord[_], expectedSize: Int): Unit = { + // Step 1: Serialize/de- original [[HoodieRecord]] + val (cloned, originalBytes) = cloneUsingKryo(record) + + assertEquals(expectedSize, originalBytes.length) + assertEquals(record, cloned) + + // Step 2: Serialize the already cloned record, and assert that ser/de loop is lossless + val (_, clonedBytes) = cloneUsingKryo(cloned) + assertEquals(ByteBuffer.wrap(originalBytes), ByteBuffer.wrap(clonedBytes)) + } + + val row = new GenericRowWithSchema(Array(1, "test", Date.valueOf(LocalDate.of(2022, 10, 1)), + Timestamp.from(Instant.parse("2022-10-01T23:59:59.00Z")), Row(Decimal.apply(123, 3, 2))), rowSchema) + val avroRecord = convertToAvroRecord(row) + + val key = new HoodieKey("rec-key", "part-path") + + val legacyRecord = toLegacyAvroRecord(avroRecord, key) + val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord) + + Seq( + (legacyRecord, 573), + (avroIndexedRecord, 442) + ) foreach { case (record, expectedSize) => routine(record, expectedSize) } + } + + @Test + def testEmptyRecord(): Unit = { + def routine(record: HoodieRecord[_], expectedSize: Int): Unit = { + // Step 1: Serialize/de- original [[HoodieRecord]] + val (cloned, originalBytes) = cloneUsingKryo(record) + + assertEquals(expectedSize, originalBytes.length) + assertEquals(record, cloned) + + // Step 2: Serialize the already cloned record, and assert that ser/de loop is lossless + val (_, clonedBytes) = cloneUsingKryo(cloned) + assertEquals(ByteBuffer.wrap(originalBytes), ByteBuffer.wrap(clonedBytes)) + } + + val key = new HoodieKey("rec-key", "part-path") + + Seq( + (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1, HoodieRecordType.AVRO), 74), + (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 2, HoodieRecordType.SPARK), 74) + ) foreach { case (record, expectedSize) => routine(record, expectedSize) } + } + + + private def toLegacyAvroRecord(avroRecord: GenericRecord, key: HoodieKey): HoodieAvroRecord[OverwriteWithLatestAvroPayload] = { + val avroRecordPayload = new OverwriteWithLatestAvroPayloadWithEquality(avroRecord, 0) + val legacyRecord = new HoodieAvroRecord[OverwriteWithLatestAvroPayload](key, avroRecordPayload) + + legacyRecord + } +} + +object TestHoodieRecordSerialization { + + private def cloneUsingKryo[T](r: HoodieRecord[T]): (HoodieRecord[T], Array[Byte]) = { + val serializer = SerializerSupport.newSerializer(true) + + val buf = serializer.serialize(r) + val cloned: HoodieRecord[T] = serializer.deserialize(buf) + + val bytes = new Array[Byte](buf.remaining()) + buf.get(bytes) + + (cloned, bytes) + } + + private def toUnsafeRow(row: InternalRow, schema: StructType): UnsafeRow = { + val project = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, schema) + project(row) + } + + private def toUnsafeRow(row: Row, schema: StructType): UnsafeRow = { + val encoder = SparkAdapterSupport.sparkAdapter.createSparkRowSerDe(schema) + val internalRow = encoder.serializeRow(row) + internalRow.asInstanceOf[UnsafeRow] + } + + private def convertToAvroRecord(row: Row): GenericRecord = { + val schema = convertStructTypeToAvroSchema(row.schema, "testRecord", "testNamespace") + + createInternalRowToAvroConverter(row.schema, schema, nullable = false) + .apply(toUnsafeRow(row, row.schema)) + } + + class OverwriteWithLatestAvroPayloadWithEquality(avroRecord: GenericRecord, _orderingVal: Comparable[_]) + extends OverwriteWithLatestAvroPayload(avroRecord, _orderingVal) { + override def equals(obj: Any): Boolean = + obj match { + case p: OverwriteWithLatestAvroPayloadWithEquality => + Objects.equals(ByteBuffer.wrap(this.recordBytes), ByteBuffer.wrap(p.recordBytes)) && + Objects.equals(this.orderingVal, p.orderingVal) + case _ => + false + } + + override def hashCode(): Int = Objects.hash(avroRecord, _orderingVal.asInstanceOf[AnyRef]) + } + +}