This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch release-feature-rfc46 in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this push: new df83709f51 [Minor] fix multi deser avro payload (#7021) df83709f51 is described below commit df83709f51b7df32ec15e69b97fa6631f3883ce5 Author: komao <masterwan...@gmail.com> AuthorDate: Tue Nov 29 07:54:51 2022 +0800 [Minor] fix multi deser avro payload (#7021) In HoodieAvroRecord, we will call isDelete, shouldIgnore before we write it to the file. Each method will deserialize HoodiePayload. So we add deserialization method in HoodieRecord and call this method once before calling isDelete or shouldIgnore. Co-authored-by: wangzixuan.wzxuan <wangzixuan.wzx...@bytedance.com> Co-authored-by: Alexey Kudinkin <ale...@infinilake.com> Co-authored-by: Alexey Kudinkin <alexey.kudin...@gmail.com> --- .../org/apache/hudi/io/HoodieAppendHandle.java | 26 ++++++------ .../java/org/apache/hudi/io/HoodieMergeHandle.java | 1 - .../hudi/commmon/model/HoodieSparkRecord.java | 30 +++++++++----- .../apache/hudi/common/model/BaseAvroPayload.java | 46 +++++++++++++++++++++- .../common/model/DefaultHoodieRecordPayload.java | 8 ++-- .../hudi/common/model/EventTimeAvroPayload.java | 9 ++--- .../apache/hudi/common/model/HoodieAvroRecord.java | 37 +++++++++++------ .../org/apache/hudi/common/model/HoodieRecord.java | 17 +++++++- .../model/OverwriteWithLatestAvroPayload.java | 24 +---------- .../common/model/PartialUpdateAvroPayload.java | 1 - .../apache/hudi/sink/utils/PayloadCreation.java | 2 +- .../main/java/org/apache/hudi/QuickstartUtils.java | 12 +++--- .../hudi/command/payload/ExpressionPayload.scala | 23 ++++++++--- .../model/TestHoodieRecordSerialization.scala | 2 +- 14 files changed, 154 insertions(+), 84 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 564d63ba77..2ef02b1dae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; @@ -215,18 +216,16 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O // If the format can not record the operation field, nullify the DELETE payload manually. boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField(); recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord)); - Option<HoodieRecord> finalRecord = Option.empty(); - if (!nullifyPayload && !hoodieRecord.isDelete(tableSchema, recordProperties)) { - if (hoodieRecord.shouldIgnore(tableSchema, recordProperties)) { - return Option.of(hoodieRecord); + Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord); + // Check for delete + if (finalRecord.isPresent() && !finalRecord.get().isDelete(tableSchema, recordProperties)) { + // Check for ignore ExpressionPayload + if (finalRecord.get().shouldIgnore(tableSchema, recordProperties)) { + return finalRecord; } // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - HoodieRecord rewrittenRecord; - if (schemaOnReadEnabled) { - rewrittenRecord = hoodieRecord.rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields); - } else { - rewrittenRecord = hoodieRecord.rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields); - } + HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields) + : finalRecord.get().rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields); HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord, writeSchemaWithMetaFields, recordProperties); finalRecord = Option.of(populatedRecord); if (isUpdateRecord) { @@ -236,6 +235,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O } recordsWritten++; } else { + finalRecord = Option.empty(); recordsDeleted++; } @@ -364,7 +364,9 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O updateWriteStatus(stat, result); } - if (config.isMetadataColumnStatsIndexEnabled()) { + // TODO MetadataColumnStatsIndex for spark record + // https://issues.apache.org/jira/browse/HUDI-5249 + if (config.isMetadataColumnStatsIndexEnabled() && recordMerger.getRecordType() == HoodieRecordType.AVRO) { final List<Schema.Field> fieldsToIndex; // If column stats index is enabled but columns not configured then we assume that // all columns should be indexed @@ -511,7 +513,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O record.seal(); } // fetch the ordering val first in case the record was deflated. - final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, config.getProps()); + final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, recordProperties); Option<HoodieRecord> indexedRecord = prepareRecord(record); if (indexedRecord.isPresent()) { // Skip the ignored record. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 276b318890..4e5370f108 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -347,7 +347,6 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props); Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null); Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft); - if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) { // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record. copyOldRecord = true; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java index 43000d1964..19b8cb5c65 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -122,6 +123,19 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo this.schema = schema; } + public HoodieSparkRecord( + HoodieKey key, + InternalRow data, + StructType schema, + HoodieOperation operation, + HoodieRecordLocation currentLocation, + HoodieRecordLocation newLocation, + boolean copy) { + super(key, data, operation, currentLocation, newLocation); + this.copy = copy; + this.schema = schema; + } + @Override public HoodieSparkRecord newInstance() { return new HoodieSparkRecord(this.key, this.data, this.schema, this.operation, this.copy); @@ -175,7 +189,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData()); UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType); - return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -189,7 +203,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo // TODO add actual rewriting InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields); - return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -204,7 +218,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols); HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields); - return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -219,7 +233,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo } }); - return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -244,11 +258,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo @Override public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException { - if (data != null && data.equals(SENTINEL)) { - return true; - } else { - return false; - } + return false; } @Override @@ -284,7 +294,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString(); } HoodieKey hoodieKey = new HoodieKey(key, partition); - return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy); + return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index cd3a95e6bf..aaafe61abf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -21,9 +21,11 @@ package org.apache.hudi.common.model; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import java.io.Serializable; +import java.util.Properties; /** * Base class for all AVRO record based payloads, that can be ordered based on a field. @@ -32,12 +34,14 @@ public abstract class BaseAvroPayload implements Serializable { /** * Avro data extracted from the source converted to bytes. */ - public final byte[] recordBytes; + protected final byte[] recordBytes; /** * For purposes of preCombining. */ - public final Comparable orderingVal; + protected final Comparable orderingVal; + + protected final boolean isDeletedRecord; /** * Instantiate {@link BaseAvroPayload}. @@ -48,8 +52,46 @@ public abstract class BaseAvroPayload implements Serializable { public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0]; this.orderingVal = orderingVal; + this.isDeletedRecord = record == null || isDeleteRecord(record); + if (orderingVal == null) { throw new HoodieException("Ordering value is null for record: " + record); } } + + public Comparable getOrderingVal() { + return orderingVal; + } + + /** + * Defines whether this implementation of {@link HoodieRecordPayload} is deleted. + * We will not do deserialization in this method. + */ + public boolean isDeleted(Schema schema, Properties props) { + return isDeletedRecord; + } + + /** + * Defines whether this implementation of {@link HoodieRecordPayload} could produce + * {@link HoodieRecord#SENTINEL} + */ + public boolean canProduceSentinel() { + return false; + } + + /** + * @param genericRecord instance of {@link GenericRecord} of interest. + * @returns {@code true} if record represents a delete record. {@code false} otherwise. + */ + protected static boolean isDeleteRecord(GenericRecord genericRecord) { + final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD; + // Modify to be compatible with new version Avro. + // The new version Avro throws for GenericRecord.get if the field name + // does not exist in the schema. + if (genericRecord.getSchema().getField(isDeleteKey) == null) { + return false; + } + Object deleteMarker = genericRecord.get(isDeleteKey); + return (deleteMarker instanceof Boolean && (boolean) deleteMarker); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 5a588eafa5..a218e9dc33 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -51,7 +51,7 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { @Override public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } @@ -71,18 +71,18 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(incomingRecord); } @Override public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(incomingRecord); } private static Option<Object> updateEventTime(GenericRecord record, Properties properties) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java index 7c8efb66e5..b750cffb6a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java @@ -46,7 +46,7 @@ public class EventTimeAvroPayload extends DefaultHoodieRecordPayload { @Override public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } @@ -61,17 +61,16 @@ public class EventTimeAvroPayload extends DefaultHoodieRecordPayload { /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(incomingRecord); } @Override public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } - GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(bytesToAvro(recordBytes, schema)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index de653054cd..a1318c462c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -51,6 +51,15 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor super(record); } + public HoodieAvroRecord( + HoodieKey key, + T data, + HoodieOperation operation, + HoodieRecordLocation currentLocation, + HoodieRecordLocation newLocation) { + super(key, data, operation, currentLocation, newLocation); + } + public HoodieAvroRecord() { } @@ -113,14 +122,14 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor Option<IndexedRecord> avroRecordPayloadOpt = getData().getInsertValue(recordSchema, props); GenericRecord avroPayloadInNewSchema = HoodieAvroUtils.rewriteRecord((GenericRecord) avroRecordPayloadOpt.get(), targetSchema); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation(), this.currentLocation, this.newLocation); } @Override public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException { GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation); } @Override @@ -133,30 +142,36 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor } }); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation); } @Override public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException { GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation); } @Override public boolean isDelete(Schema recordSchema, Properties props) throws IOException { - return !getData().getInsertValue(recordSchema, props).isPresent(); + if (this.data instanceof BaseAvroPayload) { + return ((BaseAvroPayload) this.data).isDeleted(recordSchema, props); + } else { + return !this.data.getInsertValue(recordSchema, props).isPresent(); + } } @Override public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException { - Option<IndexedRecord> insertRecord = getData().getInsertValue(recordSchema, props); - // just skip the ignored record - if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) { - return true; - } else { - return false; + HoodieRecordPayload<?> recordPayload = getData(); + // NOTE: Currently only records borne by [[ExpressionPayload]] can currently be ignored, + // as such, we limit exposure of this method only to such payloads + if (recordPayload instanceof BaseAvroPayload && ((BaseAvroPayload) recordPayload).canProduceSentinel()) { + Option<IndexedRecord> insertRecord = recordPayload.getInsertValue(recordSchema, props); + return insertRecord.isPresent() && insertRecord.get().equals(SENTINEL); } + + return false; } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 778186d4bc..255a2b2a10 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -124,12 +124,12 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf /** * Current location of record on storage. Filled in by looking up index */ - private HoodieRecordLocation currentLocation; + protected HoodieRecordLocation currentLocation; /** * New location of record on storage, after written. */ - private HoodieRecordLocation newLocation; + protected HoodieRecordLocation newLocation; /** * Indicates whether the object is sealed. @@ -154,6 +154,19 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf this.operation = operation; } + public HoodieRecord( + HoodieKey key, + T data, + HoodieOperation operation, + HoodieRecordLocation currentLocation, + HoodieRecordLocation newLocation) { + this.key = key; + this.data = data; + this.currentLocation = currentLocation; + this.newLocation = newLocation; + this.operation = operation; + } + public HoodieRecord(HoodieRecord<T> record) { this(record.key, record.data, record.operation); this.currentLocation = record.currentLocation; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 5268d76281..a99e3005f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -69,31 +69,11 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload @Override public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } - IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); - if (isDeleteRecord((GenericRecord) indexedRecord)) { - return Option.empty(); - } else { - return Option.of(indexedRecord); - } - } - /** - * @param genericRecord instance of {@link GenericRecord} of interest. - * @returns {@code true} if record represents a delete record. {@code false} otherwise. - */ - protected boolean isDeleteRecord(GenericRecord genericRecord) { - final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD; - // Modify to be compatible with new version Avro. - // The new version Avro throws for GenericRecord.get if the field name - // does not exist in the schema. - if (genericRecord.getSchema().getField(isDeleteKey) == null) { - return false; - } - Object deleteMarker = genericRecord.get(isDeleteKey); - return (deleteMarker instanceof Boolean && (boolean) deleteMarker); + return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index daa40acc76..7871e45156 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -143,7 +143,6 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro Schema schema, boolean isOldRecordNewer) throws IOException { Option<IndexedRecord> recordOption = getInsertValue(schema); - if (!recordOption.isPresent()) { // use natural order for delete record return Option.empty(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index fb850bace7..b7756a490b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -86,7 +86,7 @@ public class PayloadCreation implements Serializable { public HoodieRecordPayload<?> createDeletePayload(BaseAvroPayload payload) throws Exception { if (shouldCombine) { - return (HoodieRecordPayload<?>) constructor.newInstance(null, payload.orderingVal); + return (HoodieRecordPayload<?>) constructor.newInstance(null, payload.getOrderingVal()); } else { return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.empty()); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java index 453cbb4e74..59674b928f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -18,7 +18,9 @@ package org.apache.hudi; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -26,10 +28,6 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import java.io.IOException; @@ -239,8 +237,8 @@ public class QuickstartUtils { private static Option<String> convertToString(HoodieRecord record) { try { - String str = HoodieAvroUtils - .bytesToAvro(((OverwriteWithLatestAvroPayload) record.getData()).recordBytes, DataGenerator.avroSchema) + String str = ((OverwriteWithLatestAvroPayload) record.getData()) + .getInsertValue(DataGenerator.avroSchema) .toString(); str = "{" + str.substring(str.indexOf("\"ts\":")); return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 0f11cbf954..5d8e224477 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -24,10 +24,10 @@ import org.apache.hudi.AvroConversionUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro +import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord} import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.io.HoodieWriteHandle import org.apache.hudi.sql.IExpressionEvaluator import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.Expression @@ -35,8 +35,8 @@ import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema, setWriteSchema} import org.apache.spark.sql.types.{StructField, StructType} -import java.util.{Base64, Properties} import java.util.function.Function +import java.util.{Base64, Properties} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -46,7 +46,7 @@ import scala.collection.mutable.ArrayBuffer * match and not-match actions and compute the final record to write. * * If there is no condition match the record, ExpressionPayload will return - * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this record. + * a [[HoodieRecord.SENTINEL]], and the write handles will ignore this record. */ class ExpressionPayload(record: GenericRecord, orderingVal: Comparable[_]) @@ -77,11 +77,14 @@ class ExpressionPayload(record: GenericRecord, processMatchedRecord(joinSqlRecord, Some(targetRecord), properties) } + override def canProduceSentinel: Boolean = true + /** * Process the matched record. Firstly test if the record matched any of the update-conditions, * if matched, return the update assignments result. Secondly, test if the record matched * delete-condition, if matched then return a delete record. Finally if no condition matched, - * return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * return a [[HoodieRecord.SENTINEL]] which will be ignored by HoodieWriteHandle. + * * @param inputRecord The input record to process. * @param targetRecord The origin exist record. * @param properties The properties. @@ -140,7 +143,7 @@ class ExpressionPayload(record: GenericRecord, /** * Process the not-matched record. Test if the record matched any of insert-conditions, * if matched then return the result of insert-assignment. Or else return a - * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * [[HoodieRecord.SENTINEL]] which will be ignored by HoodieWriteHandle. * * @param inputRecord The input record to process. * @param properties The properties. @@ -173,6 +176,16 @@ class ExpressionPayload(record: GenericRecord, } } + override def isDeleted(schema: Schema, props: Properties): Boolean = { + val deleteConditionText = props.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) + val isUpdateRecord = props.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, "false").toBoolean + val isDeleteOnCondition= if (isUpdateRecord && deleteConditionText != null) { + !getInsertValue(schema, props).isPresent + } else false + + isDeletedRecord || isDeleteOnCondition + } + override def getInsertValue(schema: Schema, properties: Properties): HOption[IndexedRecord] = { val incomingRecord = bytesToAvro(recordBytes, schema) if (isDeleteRecord(incomingRecord)) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala index eb1339ad2f..02cb46721c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala @@ -110,7 +110,7 @@ class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness { val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord) Seq( - (legacyRecord, 527), + (legacyRecord, 528), (avroIndexedRecord, 389) ) foreach { case (record, expectedSize) => routine(record, expectedSize) } }