This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc46 by this 
push:
     new df83709f51 [Minor] fix multi deser avro payload (#7021)
df83709f51 is described below

commit df83709f51b7df32ec15e69b97fa6631f3883ce5
Author: komao <masterwan...@gmail.com>
AuthorDate: Tue Nov 29 07:54:51 2022 +0800

    [Minor] fix multi deser avro payload (#7021)
    
    In HoodieAvroRecord, we will call isDelete, shouldIgnore before we write it 
to the file. Each method will deserialize HoodiePayload. So we add 
deserialization method in HoodieRecord and call this method once before calling 
isDelete or shouldIgnore.
    
    Co-authored-by: wangzixuan.wzxuan <wangzixuan.wzx...@bytedance.com>
    Co-authored-by: Alexey Kudinkin <ale...@infinilake.com>
    Co-authored-by: Alexey Kudinkin <alexey.kudin...@gmail.com>
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 26 ++++++------
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  1 -
 .../hudi/commmon/model/HoodieSparkRecord.java      | 30 +++++++++-----
 .../apache/hudi/common/model/BaseAvroPayload.java  | 46 +++++++++++++++++++++-
 .../common/model/DefaultHoodieRecordPayload.java   |  8 ++--
 .../hudi/common/model/EventTimeAvroPayload.java    |  9 ++---
 .../apache/hudi/common/model/HoodieAvroRecord.java | 37 +++++++++++------
 .../org/apache/hudi/common/model/HoodieRecord.java | 17 +++++++-
 .../model/OverwriteWithLatestAvroPayload.java      | 24 +----------
 .../common/model/PartialUpdateAvroPayload.java     |  1 -
 .../apache/hudi/sink/utils/PayloadCreation.java    |  2 +-
 .../main/java/org/apache/hudi/QuickstartUtils.java | 12 +++---
 .../hudi/command/payload/ExpressionPayload.scala   | 23 ++++++++---
 .../model/TestHoodieRecordSerialization.scala      |  2 +-
 14 files changed, 154 insertions(+), 84 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 564d63ba77..2ef02b1dae 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
@@ -215,18 +216,16 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       // If the format can not record the operation field, nullify the DELETE 
payload manually.
       boolean nullifyPayload = 
HoodieOperation.isDelete(hoodieRecord.getOperation()) && 
!config.allowOperationMetadataField();
       
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, 
String.valueOf(isUpdateRecord));
-      Option<HoodieRecord> finalRecord = Option.empty();
-      if (!nullifyPayload && !hoodieRecord.isDelete(tableSchema, 
recordProperties)) {
-        if (hoodieRecord.shouldIgnore(tableSchema, recordProperties)) {
-          return Option.of(hoodieRecord);
+      Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() : 
Option.of(hoodieRecord);
+      // Check for delete
+      if (finalRecord.isPresent() && !finalRecord.get().isDelete(tableSchema, 
recordProperties)) {
+        // Check for ignore ExpressionPayload
+        if (finalRecord.get().shouldIgnore(tableSchema, recordProperties)) {
+          return finalRecord;
         }
         // Convert GenericRecord to GenericRecord with hoodie commit metadata 
in schema
-        HoodieRecord rewrittenRecord;
-        if (schemaOnReadEnabled) {
-          rewrittenRecord = 
hoodieRecord.rewriteRecordWithNewSchema(tableSchema, recordProperties, 
writeSchemaWithMetaFields);
-        } else {
-          rewrittenRecord = hoodieRecord.rewriteRecord(tableSchema, 
recordProperties, writeSchemaWithMetaFields);
-        }
+        HoodieRecord rewrittenRecord = schemaOnReadEnabled ? 
finalRecord.get().rewriteRecordWithNewSchema(tableSchema, recordProperties, 
writeSchemaWithMetaFields)
+            : finalRecord.get().rewriteRecord(tableSchema, recordProperties, 
writeSchemaWithMetaFields);
         HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord, 
writeSchemaWithMetaFields, recordProperties);
         finalRecord = Option.of(populatedRecord);
         if (isUpdateRecord) {
@@ -236,6 +235,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         }
         recordsWritten++;
       } else {
+        finalRecord = Option.empty();
         recordsDeleted++;
       }
 
@@ -364,7 +364,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       updateWriteStatus(stat, result);
     }
 
-    if (config.isMetadataColumnStatsIndexEnabled()) {
+    // TODO MetadataColumnStatsIndex for spark record
+    // https://issues.apache.org/jira/browse/HUDI-5249
+    if (config.isMetadataColumnStatsIndexEnabled() && 
recordMerger.getRecordType() == HoodieRecordType.AVRO) {
       final List<Schema.Field> fieldsToIndex;
       // If column stats index is enabled but columns not configured then we 
assume that
       // all columns should be indexed
@@ -511,7 +513,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       record.seal();
     }
     // fetch the ordering val first in case the record was deflated.
-    final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, 
config.getProps());
+    final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, 
recordProperties);
     Option<HoodieRecord> indexedRecord = prepareRecord(record);
     if (indexedRecord.isPresent()) {
       // Skip the ignored record.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 276b318890..4e5370f108 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -347,7 +347,6 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
         Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
         Schema combineRecordSchema = 
mergeResult.map(Pair::getRight).orElse(null);
         Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
-
         if (combinedRecord.isPresent() && 
combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
           // If it is an IGNORE_RECORD, just copy the old record, and do not 
update the new record.
           copyOldRecord = true;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index 43000d1964..19b8cb5c65 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
@@ -122,6 +123,19 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
     this.schema = schema;
   }
 
+  public HoodieSparkRecord(
+      HoodieKey key,
+      InternalRow data,
+      StructType schema,
+      HoodieOperation operation,
+      HoodieRecordLocation currentLocation,
+      HoodieRecordLocation newLocation,
+      boolean copy) {
+    super(key, data, operation, currentLocation, newLocation);
+    this.copy = copy;
+    this.schema = schema;
+  }
+
   @Override
   public HoodieSparkRecord newInstance() {
     return new HoodieSparkRecord(this.key, this.data, this.schema, 
this.operation, this.copy);
@@ -175,7 +189,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
     InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
     UnsafeProjection projection =
         HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, 
targetStructType);
-    return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), 
targetStructType, getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), 
targetStructType, getOperation(), this.currentLocation, this.newLocation, copy);
   }
 
   @Override
@@ -189,7 +203,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
     // TODO add actual rewriting
     InternalRow finalRow = new HoodieInternalRow(metaFields, data, 
containMetaFields);
 
-    return new HoodieSparkRecord(getKey(), finalRow, targetStructType, 
getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), finalRow, targetStructType, 
getOperation(), this.currentLocation, this.newLocation, copy);
   }
 
   @Override
@@ -204,7 +218,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
         HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, 
newStructType, renameCols);
     HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, 
rewrittenRow, containMetaFields);
 
-    return new HoodieSparkRecord(getKey(), finalRow, newStructType, 
getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), finalRow, newStructType, 
getOperation(), this.currentLocation, this.newLocation, copy);
   }
 
   @Override
@@ -219,7 +233,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
       }
     });
 
-    return new HoodieSparkRecord(getKey(), updatableRow, structType, 
getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), updatableRow, structType, 
getOperation(), this.currentLocation, this.newLocation, copy);
   }
 
   @Override
@@ -244,11 +258,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
 
   @Override
   public boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException {
-    if (data != null && data.equals(SENTINEL)) {
-      return true;
-    } else {
-      return false;
-    }
+    return false;
   }
 
   @Override
@@ -284,7 +294,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> implements Kryo
       partition = 
data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), 
StringType).toString();
     }
     HoodieKey hoodieKey = new HoodieKey(key, partition);
-    return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), 
copy);
+    return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), 
this.currentLocation, this.newLocation, copy);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
index cd3a95e6bf..aaafe61abf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
@@ -21,9 +21,11 @@ package org.apache.hudi.common.model;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
 import java.io.Serializable;
+import java.util.Properties;
 
 /**
  * Base class for all AVRO record based payloads, that can be ordered based on 
a field.
@@ -32,12 +34,14 @@ public abstract class BaseAvroPayload implements 
Serializable {
   /**
    * Avro data extracted from the source converted to bytes.
    */
-  public final byte[] recordBytes;
+  protected final byte[] recordBytes;
 
   /**
    * For purposes of preCombining.
    */
-  public final Comparable orderingVal;
+  protected final Comparable orderingVal;
+
+  protected final boolean isDeletedRecord;
 
   /**
    * Instantiate {@link BaseAvroPayload}.
@@ -48,8 +52,46 @@ public abstract class BaseAvroPayload implements 
Serializable {
   public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
     this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : 
new byte[0];
     this.orderingVal = orderingVal;
+    this.isDeletedRecord = record == null || isDeleteRecord(record);
+
     if (orderingVal == null) {
       throw new HoodieException("Ordering value is null for record: " + 
record);
     }
   }
+
+  public Comparable getOrderingVal() {
+    return orderingVal;
+  }
+
+  /**
+   * Defines whether this implementation of {@link HoodieRecordPayload} is 
deleted.
+   * We will not do deserialization in this method.
+   */
+  public boolean isDeleted(Schema schema, Properties props) {
+    return isDeletedRecord;
+  }
+
+  /**
+   * Defines whether this implementation of {@link HoodieRecordPayload} could 
produce
+   * {@link HoodieRecord#SENTINEL}
+   */
+  public boolean canProduceSentinel() {
+    return false;
+  }
+
+  /**
+   * @param genericRecord instance of {@link GenericRecord} of interest.
+   * @returns {@code true} if record represents a delete record. {@code false} 
otherwise.
+   */
+  protected static boolean isDeleteRecord(GenericRecord genericRecord) {
+    final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
+    // Modify to be compatible with new version Avro.
+    // The new version Avro throws for GenericRecord.get if the field name
+    // does not exist in the schema.
+    if (genericRecord.getSchema().getField(isDeleteKey) == null) {
+      return false;
+    }
+    Object deleteMarker = genericRecord.get(isDeleteKey);
+    return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 5a588eafa5..a218e9dc33 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -51,7 +51,7 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
-    if (recordBytes.length == 0) {
+    if (recordBytes.length == 0 || isDeletedRecord) {
       return Option.empty();
     }
 
@@ -71,18 +71,18 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
     /*
      * Now check if the incoming record is a delete record.
      */
-    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+    return Option.of(incomingRecord);
   }
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
-    if (recordBytes.length == 0) {
+    if (recordBytes.length == 0 || isDeletedRecord) {
       return Option.empty();
     }
     GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
     eventTime = updateEventTime(incomingRecord, properties);
 
-    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+    return Option.of(incomingRecord);
   }
 
   private static Option<Object> updateEventTime(GenericRecord record, 
Properties properties) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
index 7c8efb66e5..b750cffb6a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
@@ -46,7 +46,7 @@ public class EventTimeAvroPayload extends 
DefaultHoodieRecordPayload {
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
-    if (recordBytes.length == 0) {
+    if (recordBytes.length == 0 || isDeletedRecord) {
       return Option.empty();
     }
 
@@ -61,17 +61,16 @@ public class EventTimeAvroPayload extends 
DefaultHoodieRecordPayload {
     /*
      * Now check if the incoming record is a delete record.
      */
-    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+    return Option.of(incomingRecord);
   }
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
-    if (recordBytes.length == 0) {
+    if (recordBytes.length == 0 || isDeletedRecord) {
       return Option.empty();
     }
-    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
 
-    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+    return Option.of(bytesToAvro(recordBytes, schema));
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index de653054cd..a1318c462c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -51,6 +51,15 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> 
extends HoodieRecor
     super(record);
   }
 
+  public HoodieAvroRecord(
+      HoodieKey key,
+      T data,
+      HoodieOperation operation,
+      HoodieRecordLocation currentLocation,
+      HoodieRecordLocation newLocation) {
+    super(key, data, operation, currentLocation, newLocation);
+  }
+
   public HoodieAvroRecord() {
   }
 
@@ -113,14 +122,14 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
     Option<IndexedRecord> avroRecordPayloadOpt = 
getData().getInsertValue(recordSchema, props);
     GenericRecord avroPayloadInNewSchema =
         HoodieAvroUtils.rewriteRecord((GenericRecord) 
avroRecordPayloadOpt.get(), targetSchema);
-    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(avroPayloadInNewSchema), getOperation());
+    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(avroPayloadInNewSchema), getOperation(), 
this.currentLocation, this.newLocation);
   }
 
   @Override
   public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties props, Schema newSchema, Map<String, String> renameCols) throws 
IOException {
     GenericRecord oldRecord = (GenericRecord) 
getData().getInsertValue(recordSchema, props).get();
     GenericRecord rewriteRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
-    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(rewriteRecord), getOperation());
+    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, 
this.newLocation);
   }
 
   @Override
@@ -133,30 +142,36 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
       }
     });
 
-    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(avroRecordPayload), getOperation());
+    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, 
this.newLocation);
   }
 
   @Override
   public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, 
String keyFieldName) throws IOException {
     GenericRecord avroRecordPayload = (GenericRecord) 
getData().getInsertValue(recordSchema, props).get();
     avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING);
-    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(avroRecordPayload), getOperation());
+    return new HoodieAvroRecord<>(getKey(), new 
RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, 
this.newLocation);
   }
 
   @Override
   public boolean isDelete(Schema recordSchema, Properties props) throws 
IOException {
-    return !getData().getInsertValue(recordSchema, props).isPresent();
+    if (this.data instanceof BaseAvroPayload) {
+      return ((BaseAvroPayload) this.data).isDeleted(recordSchema, props);
+    } else {
+      return !this.data.getInsertValue(recordSchema, props).isPresent();
+    }
   }
 
   @Override
   public boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException {
-    Option<IndexedRecord> insertRecord = 
getData().getInsertValue(recordSchema, props);
-    // just skip the ignored record
-    if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) {
-      return true;
-    } else {
-      return false;
+    HoodieRecordPayload<?> recordPayload = getData();
+    // NOTE: Currently only records borne by [[ExpressionPayload]] can 
currently be ignored,
+    //       as such, we limit exposure of this method only to such payloads
+    if (recordPayload instanceof BaseAvroPayload && ((BaseAvroPayload) 
recordPayload).canProduceSentinel()) {
+      Option<IndexedRecord> insertRecord = 
recordPayload.getInsertValue(recordSchema, props);
+      return insertRecord.isPresent() && insertRecord.get().equals(SENTINEL);
     }
+
+    return false;
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 778186d4bc..255a2b2a10 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -124,12 +124,12 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
   /**
    * Current location of record on storage. Filled in by looking up index
    */
-  private HoodieRecordLocation currentLocation;
+  protected HoodieRecordLocation currentLocation;
 
   /**
    * New location of record on storage, after written.
    */
-  private HoodieRecordLocation newLocation;
+  protected HoodieRecordLocation newLocation;
 
   /**
    * Indicates whether the object is sealed.
@@ -154,6 +154,19 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     this.operation = operation;
   }
 
+  public HoodieRecord(
+      HoodieKey key,
+      T data,
+      HoodieOperation operation,
+      HoodieRecordLocation currentLocation,
+      HoodieRecordLocation newLocation) {
+    this.key = key;
+    this.data = data;
+    this.currentLocation = currentLocation;
+    this.newLocation = newLocation;
+    this.operation = operation;
+  }
+
   public HoodieRecord(HoodieRecord<T> record) {
     this(record.key, record.data, record.operation);
     this.currentLocation = record.currentLocation;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index 5268d76281..a99e3005f1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -69,31 +69,11 @@ public class OverwriteWithLatestAvroPayload extends 
BaseAvroPayload
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema) throws 
IOException {
-    if (recordBytes.length == 0) {
+    if (recordBytes.length == 0 || isDeletedRecord) {
       return Option.empty();
     }
-    IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
-    if (isDeleteRecord((GenericRecord) indexedRecord)) {
-      return Option.empty();
-    } else {
-      return Option.of(indexedRecord);
-    }
-  }
 
-  /**
-   * @param genericRecord instance of {@link GenericRecord} of interest.
-   * @returns {@code true} if record represents a delete record. {@code false} 
otherwise.
-   */
-  protected boolean isDeleteRecord(GenericRecord genericRecord) {
-    final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
-    // Modify to be compatible with new version Avro.
-    // The new version Avro throws for GenericRecord.get if the field name
-    // does not exist in the schema.
-    if (genericRecord.getSchema().getField(isDeleteKey) == null) {
-      return false;
-    }
-    Object deleteMarker = genericRecord.get(isDeleteKey);
-    return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
+    return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, 
schema));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index daa40acc76..7871e45156 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -143,7 +143,6 @@ public class PartialUpdateAvroPayload extends 
OverwriteNonDefaultsWithLatestAvro
       Schema schema,
       boolean isOldRecordNewer) throws IOException {
     Option<IndexedRecord> recordOption = getInsertValue(schema);
-
     if (!recordOption.isPresent()) {
       // use natural order for delete record
       return Option.empty();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
index fb850bace7..b7756a490b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
@@ -86,7 +86,7 @@ public class PayloadCreation implements Serializable {
 
   public HoodieRecordPayload<?> createDeletePayload(BaseAvroPayload payload) 
throws Exception {
     if (shouldCombine) {
-      return (HoodieRecordPayload<?>) constructor.newInstance(null, 
payload.orderingVal);
+      return (HoodieRecordPayload<?>) constructor.newInstance(null, 
payload.getOrderingVal());
     } else {
       return (HoodieRecordPayload<?>) 
this.constructor.newInstance(Option.empty());
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
index 453cbb4e74..59674b928f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -26,10 +28,6 @@ import 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 
 import java.io.IOException;
@@ -239,8 +237,8 @@ public class QuickstartUtils {
 
   private static Option<String> convertToString(HoodieRecord record) {
     try {
-      String str = HoodieAvroUtils
-          .bytesToAvro(((OverwriteWithLatestAvroPayload) 
record.getData()).recordBytes, DataGenerator.avroSchema)
+      String str = ((OverwriteWithLatestAvroPayload) record.getData())
+          .getInsertValue(DataGenerator.avroSchema)
           .toString();
       str = "{" + str.substring(str.indexOf("\"ts\":"));
       return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + 
record.getPartitionPath() + "\"}"));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 0f11cbf954..5d8e224477 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -24,10 +24,10 @@ import org.apache.hudi.AvroConversionUtils
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
+import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodiePayloadProps, HoodieRecord}
 import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.io.HoodieWriteHandle
 import org.apache.hudi.sql.IExpressionEvaluator
 import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -35,8 +35,8 @@ import org.apache.spark.sql.hudi.SerDeUtils
 import 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, 
getMergedSchema, setWriteSchema}
 import org.apache.spark.sql.types.{StructField, StructType}
 
-import java.util.{Base64, Properties}
 import java.util.function.Function
+import java.util.{Base64, Properties}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -46,7 +46,7 @@ import scala.collection.mutable.ArrayBuffer
  * match and not-match actions and compute the final record to write.
  *
  * If there is no condition match the record, ExpressionPayload will return
- * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this 
record.
+ * a [[HoodieRecord.SENTINEL]], and the write handles will ignore this record.
  */
 class ExpressionPayload(record: GenericRecord,
                         orderingVal: Comparable[_])
@@ -77,11 +77,14 @@ class ExpressionPayload(record: GenericRecord,
     processMatchedRecord(joinSqlRecord, Some(targetRecord), properties)
   }
 
+  override def canProduceSentinel: Boolean = true
+
   /**
    * Process the matched record. Firstly test if the record matched any of the 
update-conditions,
    * if matched, return the update assignments result. Secondly, test if the 
record matched
    * delete-condition, if matched then return a delete record. Finally if no 
condition matched,
-   * return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by 
HoodieWriteHandle.
+   * return a [[HoodieRecord.SENTINEL]] which will be ignored by 
HoodieWriteHandle.
+   *
    * @param inputRecord  The input record to process.
    * @param targetRecord The origin exist record.
    * @param properties   The properties.
@@ -140,7 +143,7 @@ class ExpressionPayload(record: GenericRecord,
   /**
    * Process the not-matched record. Test if the record matched any of 
insert-conditions,
    * if matched then return the result of insert-assignment. Or else return a
-   * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by 
HoodieWriteHandle.
+   * [[HoodieRecord.SENTINEL]] which will be ignored by HoodieWriteHandle.
    *
    * @param inputRecord The input record to process.
    * @param properties  The properties.
@@ -173,6 +176,16 @@ class ExpressionPayload(record: GenericRecord,
     }
   }
 
+  override def isDeleted(schema: Schema, props: Properties): Boolean = {
+    val deleteConditionText = 
props.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
+    val isUpdateRecord = 
props.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, 
"false").toBoolean
+    val isDeleteOnCondition= if (isUpdateRecord && deleteConditionText != 
null) {
+      !getInsertValue(schema, props).isPresent
+    } else false
+
+    isDeletedRecord || isDeleteOnCondition
+  }
+
   override def getInsertValue(schema: Schema, properties: Properties): 
HOption[IndexedRecord] = {
     val incomingRecord = bytesToAvro(recordBytes, schema)
     if (isDeleteRecord(incomingRecord)) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index eb1339ad2f..02cb46721c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -110,7 +110,7 @@ class TestHoodieRecordSerialization extends 
SparkClientFunctionalTestHarness {
     val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
 
     Seq(
-      (legacyRecord, 527),
+      (legacyRecord, 528),
       (avroIndexedRecord, 389)
     ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
   }

Reply via email to