This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new c0576131759 [HUDI-6798] Add record merging mode and implement event-time ordering in the new file group reader (#9894) c0576131759 is described below commit c05761317596585a3c0c3cc69a34b4407843351c Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Sun Jun 9 20:48:09 2024 -0700 [HUDI-6798] Add record merging mode and implement event-time ordering in the new file group reader (#9894) This PR adds a new table config `hoodie.record.merge.mode` to control the record merging mode and behavior in the new file group reader (`HoodieFileGroupReader`) and implements event-time ordering in it. The config `hoodie.record.merge.mode` is going to be the single config that determines how the record merging happens in release 1.0 and beyond. --------- Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com> --- .../hudi/client/TestTableSchemaEvolution.java | 3 + .../hudi/common/config/HoodieCommonConfig.java | 3 + .../apache/hudi/common/config/RecordMergeMode.java | 36 ++++ .../hudi/common/table/HoodieTableConfig.java | 13 +- .../hudi/common/table/HoodieTableMetaClient.java | 114 ++++++++++- .../table/log/BaseHoodieLogRecordReader.java | 7 + .../table/log/HoodieMergedLogRecordReader.java | 13 +- .../read/HoodieBaseFileGroupRecordBuffer.java | 209 ++++++++++++++++----- .../common/table/read/HoodieFileGroupReader.java | 26 ++- .../table/read/TestHoodieFileGroupReaderBase.java | 77 ++++++-- .../common/table/TestHoodieTableMetaClient.java | 144 ++++++++++++++ .../hudi/common/table/read/TestCustomMerger.java | 4 + .../common/table/read/TestEventTimeMerging.java | 4 + ...stHoodiePositionBasedFileGroupRecordBuffer.java | 6 +- .../read/TestHoodieFileGroupReaderOnSpark.scala | 11 +- 15 files changed, 588 insertions(+), 82 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index f5fa70c6668..496b42c13d6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -20,6 +20,7 @@ package org.apache.hudi.client; import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -48,6 +49,7 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieCommonConfig.RECORD_MERGE_MODE; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.EXTRA_TYPE_SCHEMA; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA; @@ -165,6 +167,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { HoodieTableMetaClient.withPropertyBuilder() .fromMetaClient(metaClient) .setTableType(HoodieTableType.MERGE_ON_READ) + .setRecordMergeMode(RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue())) .setTimelineLayoutVersion(VERSION_1) .initTable(metaClient.getStorageConf().newInstance(), metaClient.getBasePath()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 1a4c2e31780..c96b07ee4f0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.config; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.collection.ExternalSpillableMap; @@ -81,6 +82,8 @@ public class HoodieCommonConfig extends HoodieConfig { + " operation will fail schema compatibility check. Set this option to true will make the missing " + " column be filled with null values to successfully complete the write operation."); + public static final ConfigProperty<String> RECORD_MERGE_MODE = HoodieTableConfig.RECORD_MERGE_MODE; + public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java new file mode 100644 index 00000000000..641f3514ad6 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.config; + +@EnumDescription("Determines the logic of merging updates") +public enum RecordMergeMode { + @EnumFieldDescription("Using transaction time to merge records, i.e., the record from later " + + "transaction overwrites the earlier record with the same key.") + OVERWRITE_WITH_LATEST, + + @EnumFieldDescription("Using event time as the ordering to merge records, i.e., the record " + + "with the larger event time overwrites the record with the smaller event time on the " + + "same key, regardless of transaction time. The event time or preCombine field needs " + + "to be specified by the user.") + EVENT_TIME_ORDERING, + + @EnumFieldDescription("Using custom merging logic specified by the user.") + CUSTOM +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 87263a13f9d..b3bf9668d93 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.OrderedProperties; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.BootstrapIndexType; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; @@ -45,8 +46,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.slf4j.Logger; @@ -175,6 +176,12 @@ public class HoodieTableConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("Version of timeline used, by the table."); + public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty + .key("hoodie.record.merge.mode") + .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name()) + .sinceVersion("1.0.0") + .withDocumentation(RecordMergeMode.class); + public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.compaction.payload.class") .defaultValue(DefaultHoodieRecordPayload.class.getName()) @@ -532,6 +539,10 @@ public class HoodieTableConfig extends HoodieConfig { setValue(VERSION, Integer.toString(tableVersion.versionCode())); } + public RecordMergeMode getRecordMergeMode() { + return RecordMergeMode.valueOf(getStringOrDefault(RECORD_MERGE_MODE).toUpperCase()); + } + /** * Read the payload class for HoodieRecords from the table properties. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index e8e99ff9a0c..250091ecec6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -22,17 +22,20 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetaserverConfig; import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.FileSystemRetryConfig; import org.apache.hudi.common.fs.NoOpConsistencyGuard; import org.apache.hudi.common.model.BootstrapIndexType; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieIndexMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTimelineTimeZone; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.RecordPayloadType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; @@ -47,7 +50,6 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; @@ -76,10 +78,14 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; import static org.apache.hudi.common.table.HoodieTableConfig.INITIAL_VERSION; +import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE; import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.io.storage.HoodieIOFactory.getIOFactory; /** @@ -161,7 +167,7 @@ public class HoodieTableMetaClient implements Serializable { Option<TimelineLayoutVersion> tableConfigVersion = tableConfig.getTimelineLayoutVersion(); if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) { // Ensure layout version passed in config is not lower than the one seen in hoodie.properties - ValidationUtils.checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >= 0, + checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >= 0, "Layout Version defined in hoodie properties has higher version (" + tableConfigVersion.get() + ") than the one passed in config (" + layoutVersion.get() + ")"); } @@ -196,7 +202,7 @@ public class HoodieTableMetaClient implements Serializable { String indexType, Map<String, Map<String, String>> columns, Map<String, String> options) { - ValidationUtils.checkState( + checkState( !indexMetadataOpt.isPresent() || !indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName), "Index metadata is already present"); List<String> columnNames = new ArrayList<>(columns.keySet()); @@ -892,9 +898,9 @@ public class HoodieTableMetaClient implements Serializable { } public HoodieTableMetaClient build() { - ValidationUtils.checkArgument(conf != null || storage != null, + checkArgument(conf != null || storage != null, "Storage configuration or HoodieStorage needs to be set to init HoodieTableMetaClient"); - ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient"); + checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient"); if (timeGeneratorConfig == null) { timeGeneratorConfig = HoodieTimeGeneratorConfig.newBuilder().withPath(basePath).build(); } @@ -923,6 +929,7 @@ public class HoodieTableMetaClient implements Serializable { private String recordKeyFields; private String secondaryKeyFields; private String archiveLogFolder; + private RecordMergeMode recordMergeMode; private String payloadClassName; private String payloadType; private String recordMergerStrategy; @@ -999,6 +1006,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setRecordMergeMode(RecordMergeMode recordMergeMode) { + this.recordMergeMode = recordMergeMode; + return this; + } + public PropertyBuilder setPayloadClassName(String payloadClassName) { this.payloadClassName = payloadClassName; return this; @@ -1144,6 +1156,7 @@ public class HoodieTableMetaClient implements Serializable { return setTableType(metaClient.getTableType()) .setTableName(metaClient.getTableConfig().getTableName()) .setArchiveLogFolder(metaClient.getTableConfig().getArchivelogFolder()) + .setRecordMergeMode(metaClient.getTableConfig().getRecordMergeMode()) .setPayloadClassName(metaClient.getTableConfig().getPayloadClass()) .setRecordMergerStrategy(metaClient.getTableConfig().getRecordMergerStrategy()); } @@ -1173,6 +1186,10 @@ public class HoodieTableMetaClient implements Serializable { setArchiveLogFolder( hoodieConfig.getString(HoodieTableConfig.ARCHIVELOG_FOLDER)); } + if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE)) { + setRecordMergeMode( + RecordMergeMode.valueOf(hoodieConfig.getString(HoodieTableConfig.RECORD_MERGE_MODE).toUpperCase())); + } if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)) { setPayloadClassName(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME)); } else if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_TYPE)) { @@ -1262,8 +1279,8 @@ public class HoodieTableMetaClient implements Serializable { } public Properties build() { - ValidationUtils.checkArgument(tableType != null, "tableType is null"); - ValidationUtils.checkArgument(tableName != null, "tableName is null"); + checkArgument(tableType != null, "tableType is null"); + checkArgument(tableName != null, "tableName is null"); HoodieTableConfig tableConfig = new HoodieTableConfig(); @@ -1285,6 +1302,11 @@ public class HoodieTableMetaClient implements Serializable { if (recordMergerStrategy != null) { tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, recordMergerStrategy); } + inferRecordMergeMode(); + validateMergeConfigs(); + if (recordMergeMode != null) { + tableConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name()); + } } if (null != tableCreateSchema) { @@ -1385,5 +1407,83 @@ public class HoodieTableMetaClient implements Serializable { throws IOException { return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build()); } + + private void inferRecordMergeMode() { + if (null == recordMergeMode) { + boolean payloadClassNameSet = null != payloadClassName; + boolean payloadTypeSet = null != payloadType; + boolean recordMergerStrategySet = null != recordMergerStrategy; + + if (!recordMergerStrategySet + || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) { + if (payloadClassNameSet) { + if (payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) { + recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST; + } else if (payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) { + recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING; + } else { + recordMergeMode = RecordMergeMode.CUSTOM; + } + } else if (payloadTypeSet) { + if (payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())) { + recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST; + } else if (payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())) { + recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING; + } else { + recordMergeMode = RecordMergeMode.CUSTOM; + } + } else { + LOG.warn("One of the payload class name or payload type must be set for the MERGE_ON_READ table"); + recordMergeMode = RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue()); + LOG.warn("Setting the record merge mode to the default: {}", recordMergeMode); + } + } else { + // Custom merger strategy is set + recordMergeMode = RecordMergeMode.CUSTOM; + } + } + } + + private void validateMergeConfigs() { + boolean payloadClassNameSet = null != payloadClassName; + boolean payloadTypeSet = null != payloadType; + boolean recordMergerStrategySet = null != recordMergerStrategy; + boolean recordMergeModeSet = null != recordMergeMode; + + checkArgument(recordMergeModeSet, + "Record merge mode " + HoodieTableConfig.RECORD_MERGE_MODE.key() + " should be set"); + switch (recordMergeMode) { + case OVERWRITE_WITH_LATEST: + checkArgument((!payloadClassNameSet && !payloadTypeSet) + || (payloadClassNameSet && payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) + || (payloadTypeSet && payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())), + constructMergeConfigErrorMessage()); + break; + case EVENT_TIME_ORDERING: + checkArgument((!payloadClassNameSet && !payloadTypeSet) + || (payloadClassNameSet && payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) + || (payloadTypeSet && payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())), + constructMergeConfigErrorMessage()); + checkArgument(!recordMergerStrategySet + || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID), + "Record merger strategy (" + (recordMergerStrategySet ? recordMergerStrategy : "null") + + ") should be consistent with the record merging mode EVENT_TIME_ORDERING"); + break; + case CUSTOM: + default: + // No op + } + } + + private String constructMergeConfigErrorMessage() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("Payload class name ("); + stringBuilder.append(payloadClassName != null ? payloadClassName : "null"); + stringBuilder.append(") or type ("); + stringBuilder.append(payloadType != null ? payloadType : "null"); + stringBuilder.append(") should be consistent with the record merge mode "); + stringBuilder.append(recordMergeMode); + return stringBuilder.toString(); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java index d58c54a929f..2f38dc9b258 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.HoodieLogFile; @@ -97,6 +98,8 @@ public abstract class BaseHoodieLogRecordReader<T> { protected final String preCombineField; // Stateless component for merging records protected final HoodieRecordMerger recordMerger; + // Record merge mode + protected final RecordMergeMode recordMergeMode; private final TypedProperties payloadProps; // Log File Paths protected final List<String> logFilePaths; @@ -148,6 +151,7 @@ public abstract class BaseHoodieLogRecordReader<T> { Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, + RecordMergeMode recordMergeMode, HoodieFileGroupRecordBuffer<T> recordBuffer) { this.readerContext = readerContext; this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); @@ -166,6 +170,7 @@ public abstract class BaseHoodieLogRecordReader<T> { } this.payloadProps = props; this.recordMerger = recordMerger; + this.recordMergeMode = recordMergeMode; this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; this.reverseReader = reverseReader; @@ -866,6 +871,8 @@ public abstract class BaseHoodieLogRecordReader<T> { throw new UnsupportedOperationException(); } + public abstract Builder withRecordMergeMode(RecordMergeMode recordMergeMode); + public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java index bcc821a34a1..c79de61536c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; @@ -75,9 +76,10 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, + RecordMergeMode recordMergeMode, HoodieFileGroupRecordBuffer<T> recordBuffer) { super(readerContext, storage, logFilePaths, reverseReader, bufferSize, instantRange, withOperationField, - forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger, recordBuffer); + forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger, recordMergeMode, recordBuffer); this.scannedPrefixes = new HashSet<>(); if (forceFullScan) { @@ -228,6 +230,7 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> private boolean forceFullScan = true; private boolean enableOptimizedLogBlocksScan = false; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; + private RecordMergeMode recordMergeMode; private HoodieFileGroupRecordBuffer<T> recordBuffer; @@ -293,6 +296,12 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> return this; } + @Override + public Builder<T> withRecordMergeMode(RecordMergeMode recordMergeMode) { + this.recordMergeMode = recordMergeMode; + return this; + } + public Builder<T> withKeyFiledOverride(String keyFieldOverride) { this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride); return this; @@ -324,7 +333,7 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> withOperationField, forceFullScan, Option.ofNullable(partitionName), Option.ofNullable(keyFieldOverride), - enableOptimizedLogBlocksScan, recordMerger, + enableOptimizedLogBlocksScan, recordMerger, recordMergeMode, recordBuffer); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java index ed8d643b215..984d9740ceb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.read; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.DeleteRecord; @@ -33,11 +34,13 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieCorruptedDataException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieKeyException; import org.apache.hudi.exception.HoodieValidationException; @@ -60,12 +63,14 @@ import java.util.function.Function; import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; +import static org.apache.hudi.common.table.read.HoodieFileGroupReader.getRecordMergeMode; public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T> { protected final HoodieReaderContext<T> readerContext; protected final Schema readerSchema; protected final Option<String> partitionNameOverrideOpt; protected final Option<String[]> partitionPathFieldOpt; + protected final RecordMergeMode recordMergeMode; protected final HoodieRecordMerger recordMerger; protected final TypedProperties payloadProps; protected final ExternalSpillableMap<Serializable, Pair<Option<T>, Map<String, Object>>> records; @@ -90,6 +95,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); this.partitionNameOverrideOpt = partitionNameOverrideOpt; this.partitionPathFieldOpt = partitionPathFieldOpt; + this.recordMergeMode = getRecordMergeMode(payloadProps); this.recordMerger = recordMerger; this.payloadProps = payloadProps; this.internalSchema = readerContext.getSchemaHandler().getInternalSchema(); @@ -147,6 +153,37 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr records.clear(); } + /** + * Compares two {@link Comparable}s. If both are numbers, converts them to {@link Long} for comparison. + * If one of the {@link Comparable}s is a String, assumes that both are String values for comparison. + * + * @param o1 {@link Comparable} object. + * @param o2 other {@link Comparable} object to compare to. + * @return comparison result. + */ + @VisibleForTesting + static int compareTo(Comparable o1, Comparable o2) { + // TODO(HUDI-7848): fix the delete records to contain the correct ordering value type + // so this util with the number comparison is not necessary. + try { + return o1.compareTo(o2); + } catch (ClassCastException e) { + if (o1 instanceof Number && o2 instanceof Number) { + Long o1LongValue = ((Number) o1).longValue(); + Long o2LongValue = ((Number) o2).longValue(); + return o1LongValue.compareTo(o2LongValue); + } else if (o1 instanceof String || o2 instanceof String) { + return o1.toString().compareTo(o2.toString()); + } else { + throw new IllegalArgumentException("Cannot compare values in different types: " + + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")"); + } + } catch (Throwable e) { + throw new HoodieException("Cannot compare values: " + + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")", e); + } + } + /** * Merge two log data records if needed. * @@ -160,42 +197,76 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr Map<String, Object> metadata, Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws IOException { if (existingRecordMetadataPair != null) { - // Merge and store the combined record - // Note that the incoming `record` is from an older commit, so it should be put as - // the `older` in the merge API - Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = enablePartialMerging - ? recordMerger.partialMerge( - readerContext.constructHoodieRecord(Option.of(record), metadata), - (Schema) metadata.get(INTERNAL_META_SCHEMA), - readerContext.constructHoodieRecord( - existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()), - (Schema) existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA), - readerSchema, - payloadProps) - : recordMerger.merge( - readerContext.constructHoodieRecord(Option.of(record), metadata), - (Schema) metadata.get(INTERNAL_META_SCHEMA), - readerContext.constructHoodieRecord( - existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()), - (Schema) existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA), - payloadProps); - - if (!combinedRecordAndSchemaOpt.isPresent()) { + if (enablePartialMerging) { + // TODO(HUDI-7843): decouple the merging logic from the merger + // and use the record merge mode to control how to merge partial updates + // Merge and store the combined record + // Note that the incoming `record` is from an older commit, so it should be put as + // the `older` in the merge API + Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = recordMerger.partialMerge( + readerContext.constructHoodieRecord(Option.of(record), metadata), + (Schema) metadata.get(INTERNAL_META_SCHEMA), + readerContext.constructHoodieRecord( + existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()), + (Schema) existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA), + readerSchema, + payloadProps); + if (!combinedRecordAndSchemaOpt.isPresent()) { + return Option.empty(); + } + Pair<HoodieRecord, Schema> combinedRecordAndSchema = combinedRecordAndSchemaOpt.get(); + HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft(); + + // If pre-combine returns existing record, no need to update it + if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().get()) { + return Option.of(Pair.of( + combinedRecord.getData(), + readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, combinedRecordAndSchema.getRight()))); + } return Option.empty(); + } else { + switch (recordMergeMode) { + case OVERWRITE_WITH_LATEST: + return Option.empty(); + case EVENT_TIME_ORDERING: + Comparable existingOrderingValue = readerContext.getOrderingValue( + existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), readerSchema, payloadProps); + if (isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), existingOrderingValue)) { + return Option.empty(); + } + Comparable incomingOrderingValue = readerContext.getOrderingValue( + Option.of(record), metadata, readerSchema, payloadProps); + if (compareTo(incomingOrderingValue, existingOrderingValue) > 0) { + return Option.of(Pair.of(record, metadata)); + } + return Option.empty(); + case CUSTOM: + default: + // Merge and store the combined record + // Note that the incoming `record` is from an older commit, so it should be put as + // the `older` in the merge API + Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = recordMerger.merge( + readerContext.constructHoodieRecord(Option.of(record), metadata), + (Schema) metadata.get(INTERNAL_META_SCHEMA), + readerContext.constructHoodieRecord( + existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()), + (Schema) existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA), + payloadProps); + + if (!combinedRecordAndSchemaOpt.isPresent()) { + return Option.empty(); + } + + Pair<HoodieRecord, Schema> combinedRecordAndSchema = combinedRecordAndSchemaOpt.get(); + HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft(); + + // If pre-combine returns existing record, no need to update it + if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().get()) { + return Option.of(Pair.of(combinedRecord.getData(), metadata)); + } + return Option.empty(); + } } - - Pair<HoodieRecord, Schema> combinedRecordAndSchema = combinedRecordAndSchemaOpt.get(); - HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft(); - - // If pre-combine returns existing record, no need to update it - if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().get()) { - return Option.of(Pair.of( - combinedRecord.getData(), - enablePartialMerging - ? readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, combinedRecordAndSchema.getRight()) - : metadata)); - } - return Option.empty(); } else { // Put the record as is // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific @@ -265,7 +336,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr * @param dataBlock current processed block * @return final read schema. */ - protected Option<Pair<Function<T,T>, Schema>> composeEvolvedSchemaTransformer( + protected Option<Pair<Function<T, T>, Schema>> composeEvolvedSchemaTransformer( HoodieDataBlock dataBlock) { if (internalSchema.isEmptySchema()) { return Option.empty(); @@ -274,7 +345,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr long currentInstantTime = Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME)); InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime, hoodieTableMetaClient, false); - Pair<InternalSchema, Map<String,String>> mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema, + Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema, true, false, false).mergeSchemaGetRenamed(); Schema mergedAvroSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(), readerSchema.getFullName()); assert mergedAvroSchema.equals(readerSchema); @@ -297,32 +368,63 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr return newer; } - Option<Pair<HoodieRecord, Schema>> mergedRecord; if (enablePartialMerging) { - mergedRecord = recordMerger.partialMerge( + // TODO(HUDI-7843): decouple the merging logic from the merger + // and use the record merge mode to control how to merge partial updates + Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.partialMerge( readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) olderInfoMap.get(INTERNAL_META_SCHEMA), readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), readerSchema, payloadProps); - } else { - mergedRecord = recordMerger.merge( - readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) olderInfoMap.get(INTERNAL_META_SCHEMA), - readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps); - } - if (mergedRecord.isPresent() - && !mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), payloadProps)) { - if (!mergedRecord.get().getRight().equals(readerSchema)) { - return Option.ofNullable((T) mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, readerSchema).getData()); + if (mergedRecord.isPresent() + && !mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), payloadProps)) { + if (!mergedRecord.get().getRight().equals(readerSchema)) { + return Option.ofNullable((T) mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, readerSchema).getData()); + } + return Option.ofNullable((T) mergedRecord.get().getLeft().getData()); + } + return Option.empty(); + } else { + switch (recordMergeMode) { + case OVERWRITE_WITH_LATEST: + return newer; + case EVENT_TIME_ORDERING: + Comparable oldOrderingValue = readerContext.getOrderingValue( + older, olderInfoMap, readerSchema, payloadProps); + if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) { + return newer; + } + Comparable newOrderingValue = readerContext.getOrderingValue( + newer, newerInfoMap, readerSchema, payloadProps); + if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) { + return Option.empty(); + } + if (compareTo(oldOrderingValue, newOrderingValue) > 0) { + return older; + } + return newer; + case CUSTOM: + default: + Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge( + readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) olderInfoMap.get(INTERNAL_META_SCHEMA), + readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps); + + if (mergedRecord.isPresent() + && !mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), payloadProps)) { + if (!mergedRecord.get().getRight().equals(readerSchema)) { + return Option.ofNullable((T) mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, readerSchema).getData()); + } + return Option.ofNullable((T) mergedRecord.get().getLeft().getData()); + } + return Option.empty(); } - return Option.ofNullable((T) mergedRecord.get().getLeft().getData()); } - return Option.empty(); } /** * Filter a record for downstream processing when: - * 1. A set of pre-specified keys exists. - * 2. The key of the record is not contained in the set. + * 1. A set of pre-specified keys exists. + * 2. The key of the record is not contained in the set. */ protected boolean shouldSkip(T record, String keyFieldName, boolean isFullKey, Set<String> keys, Schema writerSchema) { String recordKey = readerContext.getValue(record, writerSchema, keyFieldName).toString(); @@ -419,4 +521,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr .orElseGet(dataBlock::getSchema); return Pair.of(transformer, evolvedSchema); } + + private boolean isDeleteRecordWithNaturalOrder(Option<T> rowOption, + Comparable orderingValue) { + return rowOption.isEmpty() && orderingValue.equals(0); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index 396da4166a7..8661a91a12f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -19,7 +19,9 @@ package org.apache.hudi.common.table.read; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.BaseFile; @@ -46,11 +48,13 @@ import org.apache.avro.Schema; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Properties; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; /** * A file group reader that iterates through the records in a single file group. @@ -73,6 +77,7 @@ public final class HoodieFileGroupReader<T> implements Closeable { private final long length; // Core structure to store and process records. private final HoodieFileGroupRecordBuffer<T> recordBuffer; + private final RecordMergeMode recordMergeMode; private ClosableIterator<T> baseFileIterator; private final HoodieRecordMerger recordMerger; private final Option<UnaryOperator<T>> outputConverter; @@ -102,6 +107,7 @@ public final class HoodieFileGroupReader<T> implements Closeable { this.props = props; this.start = start; this.length = length; + this.recordMergeMode = getRecordMergeMode(props); this.recordMerger = readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy()); readerContext.setRecordMerger(this.recordMerger); readerContext.setTablePath(tablePath); @@ -154,11 +160,11 @@ public final class HoodieFileGroupReader<T> implements Closeable { private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile baseFile) throws IOException { BaseFile dataFile = baseFile.getBootstrapBaseFile().get(); - Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = readerContext.getSchemaHandler().getBootstrapRequiredFields(); - Pair<List<Schema.Field>,List<Schema.Field>> allFields = readerContext.getSchemaHandler().getBootstrapDataFields(); - Option<Pair<ClosableIterator<T>,Schema>> dataFileIterator = + Pair<List<Schema.Field>, List<Schema.Field>> requiredFields = readerContext.getSchemaHandler().getBootstrapRequiredFields(); + Pair<List<Schema.Field>, List<Schema.Field>> allFields = readerContext.getSchemaHandler().getBootstrapDataFields(); + Option<Pair<ClosableIterator<T>, Schema>> dataFileIterator = makeBootstrapBaseFileIteratorHelper(requiredFields.getRight(), allFields.getRight(), dataFile); - Option<Pair<ClosableIterator<T>,Schema>> skeletonFileIterator = + Option<Pair<ClosableIterator<T>, Schema>> skeletonFileIterator = makeBootstrapBaseFileIteratorHelper(requiredFields.getLeft(), allFields.getLeft(), baseFile); if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) { throw new IllegalStateException("should not be here if only partition cols are required"); @@ -180,9 +186,9 @@ public final class HoodieFileGroupReader<T> implements Closeable { * @param file file to be read * @return pair of the record iterator of the file, and the schema of the data being read */ - private Option<Pair<ClosableIterator<T>,Schema>> makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields, - List<Schema.Field> allFields, - BaseFile file) throws IOException { + private Option<Pair<ClosableIterator<T>, Schema>> makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields, + List<Schema.Field> allFields, + BaseFile file) throws IOException { if (requiredFields.isEmpty()) { return Option.empty(); } @@ -225,6 +231,7 @@ public final class HoodieFileGroupReader<T> implements Closeable { .withPartition(getRelativePartitionPath( new StoragePath(path), logFiles.get(0).getPath().getParent())) .withRecordMerger(recordMerger) + .withRecordMergeMode(recordMergeMode) .withRecordBuffer(recordBuffer) .build(); logRecordReader.close(); @@ -244,6 +251,11 @@ public final class HoodieFileGroupReader<T> implements Closeable { return new HoodieFileGroupReaderIterator<>(this); } + public static RecordMergeMode getRecordMergeMode(Properties props) { + String mergeMode = getStringWithAltKeys(props, HoodieCommonConfig.RECORD_MERGE_MODE, true).toUpperCase(); + return RecordMergeMode.valueOf(mergeMode); + } + public static class HoodieFileGroupReaderIterator<T> implements ClosableIterator<T> { private HoodieFileGroupReader<T> reader; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index a8a95887c18..9f3f8acf81c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -22,6 +22,7 @@ package org.apache.hudi.common.table.read; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -42,22 +43,28 @@ import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS; import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY; +import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE; +import static org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.compareTo; import static org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Tests {@link HoodieFileGroupReader} with different engines @@ -80,9 +87,55 @@ public abstract class TestHoodieFileGroupReaderBase<T> { Schema schema, String fileGroupId); + public abstract Comparable getComparableUTF8String(String value); + + @Test + public void testCompareToComparable() { + // Test same type + assertEquals(1, compareTo(Boolean.TRUE, Boolean.FALSE)); + assertEquals(0, compareTo(Boolean.TRUE, Boolean.TRUE)); + assertEquals(-1, compareTo(Boolean.FALSE, Boolean.TRUE)); + assertEquals(1, compareTo(20, 15)); + assertEquals(0, compareTo(15, 15)); + assertEquals(-1, compareTo(10, 15)); + assertEquals(1, compareTo(1.1f, 1.0f)); + assertEquals(0, compareTo(1.0f, 1.0f)); + assertEquals(-1, compareTo(0.9f, 1.0f)); + assertEquals(1, compareTo(1.1, 1.0)); + assertEquals(0, compareTo(1.0, 1.0)); + assertEquals(-1, compareTo(0.9, 1.0)); + assertEquals(1, compareTo("value2", "value1")); + assertEquals(0, compareTo("value1", "value1")); + assertEquals(-1, compareTo("value1", "value2")); + // Test different types which are comparable + assertEquals(1, compareTo(Long.MAX_VALUE / 2L, 10)); + assertEquals(1, compareTo(20, 10L)); + assertEquals(0, compareTo(10L, 10)); + assertEquals(0, compareTo(10, 10L)); + assertEquals(-1, compareTo(10, Long.MAX_VALUE)); + assertEquals(-1, compareTo(10L, 20)); + assertEquals(1, compareTo(getComparableUTF8String("value2"), "value1")); + assertEquals(1, compareTo("value2", getComparableUTF8String("value1"))); + assertEquals(0, compareTo(getComparableUTF8String("value1"), "value1")); + assertEquals(0, compareTo("value1", getComparableUTF8String("value1"))); + assertEquals(-1, compareTo(getComparableUTF8String("value1"), "value2")); + assertEquals(-1, compareTo("value1", getComparableUTF8String("value2"))); + } + + private static Stream<Arguments> testArguments() { + return Stream.of( + arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "avro"), + arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "parquet"), + arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro"), + arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet"), + arguments(RecordMergeMode.CUSTOM, "avro"), + arguments(RecordMergeMode.CUSTOM, "parquet") + ); + } + @ParameterizedTest - @ValueSource(strings = {"avro", "parquet"}) - public void testReadFileGroupInMergeOnReadTable(String logDataBlockFormat) throws Exception { + @MethodSource("testArguments") + public void testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode, String logDataBlockFormat) throws Exception { Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs()); writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logDataBlockFormat); @@ -90,23 +143,23 @@ public abstract class TestHoodieFileGroupReaderBase<T> { // One commit; reading one file group containing a base file only commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), INSERT.value(), writeConfigs); validateOutputFromFileGroupReader( - getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 0); + getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 0, recordMergeMode); // Two commits; reading one file group containing a base file and a log file commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), UPSERT.value(), writeConfigs); validateOutputFromFileGroupReader( - getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 1); + getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 1, recordMergeMode); // Three commits; reading one file group containing a base file and two log files commitToTable(recordsToStrings(dataGen.generateUpdates("003", 100)), UPSERT.value(), writeConfigs); validateOutputFromFileGroupReader( - getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 2); + getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 2, recordMergeMode); } } @ParameterizedTest - @ValueSource(strings = {"avro", "parquet"}) - public void testReadLogFilesOnlyInMergeOnReadTable(String logDataBlockFormat) throws Exception { + @MethodSource("testArguments") + public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMode, String logDataBlockFormat) throws Exception { Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs()); writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logDataBlockFormat); // Use InMemoryIndex to generate log only mor table @@ -116,12 +169,12 @@ public abstract class TestHoodieFileGroupReaderBase<T> { // One commit; reading one file group containing a base file only commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), INSERT.value(), writeConfigs); validateOutputFromFileGroupReader( - getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 1); + getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 1, recordMergeMode); // Two commits; reading one file group containing a base file and a log file commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), UPSERT.value(), writeConfigs); validateOutputFromFileGroupReader( - getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 2); + getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 2, recordMergeMode); } } @@ -145,7 +198,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> { String tablePath, String[] partitionPaths, boolean containsBaseFile, - int expectedLogFileNum) throws Exception { + int expectedLogFileNum, + RecordMergeMode recordMergeMode) throws Exception { HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, tablePath); Schema avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); HoodieEngineContext engineContext = new HoodieLocalEngineContext(storageConf); @@ -165,6 +219,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> { props.setProperty("hoodie.datasource.write.precombine.field", "timestamp"); props.setProperty("hoodie.payload.ordering.field", "timestamp"); props.setProperty(RECORD_MERGER_STRATEGY.key(), RECORD_MERGER_STRATEGY.defaultValue()); + props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name()); if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) { props.setProperty(PARTITION_FIELDS.key(), metaClient.getTableConfig().getString(PARTITION_FIELDS)); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index b4e2fca80d3..60358872fc8 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -18,6 +18,12 @@ package org.apache.hudi.common.table; +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.RecordPayloadType; +import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -28,16 +34,24 @@ import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; +import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Tests hoodie table meta client {@link HoodieTableMetaClient}. @@ -109,6 +123,136 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { "Commit value should be \"test-detail\""); } + private static Stream<Arguments> argumentsForInferringRecordMergeMode() { + Stream<Arguments> arguments = Stream.of( + // Record merger strategy is not set + // Payload class is set, payload type is not set + arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()), + Option.empty(), Option.empty(), RecordMergeMode.OVERWRITE_WITH_LATEST), + arguments(Option.of(DefaultHoodieRecordPayload.class.getName()), + Option.empty(), Option.empty(), RecordMergeMode.EVENT_TIME_ORDERING), + arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()), + Option.empty(), Option.empty(), RecordMergeMode.CUSTOM), + // Record merger strategy is not set + // Payload class is set, payload type is set; payload class takes precedence + arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()), + Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()), + Option.empty(), RecordMergeMode.OVERWRITE_WITH_LATEST), + arguments(Option.of(DefaultHoodieRecordPayload.class.getName()), + Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()), + Option.empty(), RecordMergeMode.EVENT_TIME_ORDERING), + arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()), + Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()), + Option.empty(), RecordMergeMode.CUSTOM), + // Record merger strategy is set to default + // Payload class is set, payload type is not set + arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()), + Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.OVERWRITE_WITH_LATEST), + arguments(Option.of(DefaultHoodieRecordPayload.class.getName()), + Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.EVENT_TIME_ORDERING), + arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()), + Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.CUSTOM), + // Record merger strategy is set to default + // Payload class is not set, payload type is set + arguments(Option.empty(), Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.OVERWRITE_WITH_LATEST), + arguments(Option.empty(), Option.of(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.EVENT_TIME_ORDERING), + arguments(Option.empty(), Option.of(RecordPayloadType.HOODIE_METADATA.name()), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.CUSTOM), + // Record merger strategy is set to default + // Payload class or payload type is not set + arguments(Option.empty(), Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue())), + // Record merger strategy is set to custom + arguments(Option.empty(), Option.empty(), Option.of("custom_merge_strategy"), + RecordMergeMode.CUSTOM), + arguments(Option.of(DefaultHoodieRecordPayload.class.getName()), + Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()), + Option.of("custom_merge_strategy"), RecordMergeMode.CUSTOM) + ); + return arguments; + } + + @ParameterizedTest + @MethodSource("argumentsForInferringRecordMergeMode") + public void testInferRecordMergeMode(Option<String> payloadClassName, + Option<String> payloadType, + Option<String> recordMergerStrategy, + RecordMergeMode expectedRecordMergeMode) { + HoodieTableMetaClient.PropertyBuilder builder = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.MERGE_ON_READ.name()) + .setTableName("table_name"); + if (payloadClassName.isPresent()) { + builder.setPayloadClassName(payloadClassName.get()); + } + if (payloadType.isPresent()) { + builder.setPayloadType(payloadType.get()); + } + if (recordMergerStrategy.isPresent()) { + builder.setRecordMergerStrategy(recordMergerStrategy.get()); + } + assertEquals(expectedRecordMergeMode, + RecordMergeMode.valueOf(builder.build().getProperty(RECORD_MERGE_MODE.key()))); + } + + private static Stream<Arguments> argumentsForValidationFailureOnMergeConfigs() { + Stream<Arguments> arguments = Stream.of( + arguments(Option.of(DefaultHoodieRecordPayload.class.getName()), Option.empty(), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.OVERWRITE_WITH_LATEST, + "Payload class name (org.apache.hudi.common.model.DefaultHoodieRecordPayload) or type " + + "(null) should be consistent with the record merge mode OVERWRITE_WITH_LATEST"), + arguments(Option.empty(), Option.of(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.OVERWRITE_WITH_LATEST, + "Payload class name (null) or type (HOODIE_AVRO_DEFAULT) " + + "should be consistent with the record merge mode OVERWRITE_WITH_LATEST"), + arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()), Option.empty(), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.EVENT_TIME_ORDERING, + "Payload class name (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload) or type " + + "(null) should be consistent with the record merge mode EVENT_TIME_ORDERING"), + arguments(Option.empty(), Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()), + Option.of(DEFAULT_MERGER_STRATEGY_UUID), + RecordMergeMode.EVENT_TIME_ORDERING, + "Payload class name (null) or type (OVERWRITE_LATEST_AVRO) " + + "should be consistent with the record merge mode EVENT_TIME_ORDERING") + ); + return arguments; + } + + @ParameterizedTest + @MethodSource("argumentsForValidationFailureOnMergeConfigs") + public void testValidationFailureOnMergeConfigs(Option<String> payloadClassName, + Option<String> payloadType, + Option<String> recordMergerStrategy, + RecordMergeMode recordMergeMode, + String expectedErrorMessage) { + HoodieTableMetaClient.PropertyBuilder builder = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.MERGE_ON_READ.name()) + .setTableName("table_name") + .setRecordMergeMode(recordMergeMode); + if (payloadClassName.isPresent()) { + builder.setPayloadClassName(payloadClassName.get()); + } + if (payloadType.isPresent()) { + builder.setPayloadType(payloadType.get()); + } + if (recordMergerStrategy.isPresent()) { + builder.setRecordMergerStrategy(recordMergerStrategy.get()); + } + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, builder::build); + assertEquals(expectedErrorMessage, exception.getMessage()); + } + @Test public void testEquals() throws IOException { HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), getTableType()); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java index 3e80d4bee56..4ec1c0556b0 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.table.read; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieRecord; @@ -58,6 +60,8 @@ public class TestCustomMerger extends HoodieFileGroupReaderTestHarness { readerContext = new HoodieTestReaderContext( Option.of(new CustomAvroMerger()), Option.of(HoodieRecordTestPayload.class.getName())); + properties.setProperty( + HoodieCommonConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name()); // ------------------------------------------------------------- // The test logic is as follows: diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java index bf0fac19c67..3b3fc3c4359 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.table.read; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger; @@ -55,6 +57,8 @@ public class TestEventTimeMerging extends HoodieFileGroupReaderTestHarness { readerContext = new HoodieTestReaderContext( Option.of(merger), Option.of(HoodieRecordTestPayload.class.getName())); + properties.setProperty( + HoodieCommonConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.EVENT_TIME_ORDERING.name()); // ------------------------------------------------------------- // The test logic is as follows: diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java index e59e65bea3e..f61db4ee247 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java @@ -19,7 +19,9 @@ package org.apache.hudi; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.DeleteRecord; @@ -112,13 +114,15 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer extends TestHoodieFile ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new HoodieSparkRecordMerger()); ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx, avroSchema, avroSchema, Option.empty(), metaClient.getTableConfig())); + TypedProperties props = new TypedProperties(); + props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name()); buffer = new HoodiePositionBasedFileGroupRecordBuffer<>( ctx, metaClient, partitionNameOpt, partitionFields, ctx.getRecordMerger(), - new TypedProperties(), + props, 1024 * 1024 * 1000, metaClient.getTempFolderPath(), ExternalSpillableMap.DiskMapType.ROCKS_DB, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index 747fcb9a2eb..e20104858b6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -19,8 +19,6 @@ package org.apache.hudi.common.table.read -import org.apache.avro.Schema -import org.apache.hadoop.conf.Configuration import org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} @@ -28,14 +26,19 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.storage.StorageConfiguration import org.apache.hudi.{HoodieSparkRecordMerger, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, HoodieUnsafeUtils, Row, SaveMode, SparkSession} +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach} import java.util + import scala.collection.JavaConverters._ /** @@ -114,4 +117,8 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int assertEquals(0, expectedDf.except(actualDf).count()) assertEquals(0, actualDf.except(expectedDf).count()) } + + override def getComparableUTF8String(value: String): Comparable[_] = { + UTF8String.fromString(value) + } }