This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b4cb00ad4bb [HUDI-9378] Add tests for Flink file group reader (#13274)
b4cb00ad4bb is described below
commit b4cb00ad4bb692824528cf354066fc6ae64f9a92
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri May 9 14:03:03 2025 +0800
[HUDI-9378] Add tests for Flink file group reader (#13274)
---
.../apache/hudi/client/model/BootstrapRowData.java | 4 +-
.../java/org/apache/hudi/util/RowDataUtils.java | 33 +++
.../hudi/BaseSparkInternalRowReaderContext.java | 10 +-
.../apache/hudi/avro/HoodieAvroReaderContext.java | 9 +-
.../apache/hudi/common/util/HoodieRecordUtils.java | 3 +
.../table/read/TestHoodieFileGroupReaderBase.java | 21 +-
.../table/format/FlinkRowDataReaderContext.java | 21 +-
.../table/format/HoodieRowDataParquetReader.java | 8 +-
.../table/TestHoodieFileGroupReaderOnFlink.java | 261 +++++++++++++++++++++
.../hudi/hadoop/HiveHoodieReaderContext.java | 10 +-
10 files changed, 340 insertions(+), 40 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
index 513b8b1994b..97d68e58018 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
@@ -31,7 +31,7 @@ import java.util.Map;
import java.util.function.Function;
/**
- * RowData implementation used when reading from Spark bootstrapped table. In
these tables, the partition values
+ * RowData implementation used when reading from Flink bootstrapped table. In
these tables, the partition values
* are not always written to the data files, so we need to use the values
inferred from the file's partition path.
*/
public class BootstrapRowData implements RowData {
@@ -60,7 +60,7 @@ public class BootstrapRowData implements RowData {
@Override
public boolean isNullAt(int pos) {
- return !partitionOrdinalToValues.containsKey(pos) || row.isNullAt(pos);
+ return !partitionOrdinalToValues.containsKey(pos) && row.isNullAt(pos);
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
index dc6d2c72f56..d1de02bfc30 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
@@ -20,12 +20,15 @@ package org.apache.hudi.util;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.util.function.Function;
@@ -102,6 +105,36 @@ public class RowDataUtils {
}
}
+ /**
+ * Convert the native Java object to the corresponding value of Flink type.
+ *
+ * @param value Java object
+ *
+ * @return Value of Flink type
+ */
+ public static Object convertValueToFlinkType(Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof String) {
+ return StringData.fromString((String) value);
+ }
+ if (value instanceof BigDecimal) {
+ BigDecimal decimalVal = (BigDecimal) value;
+ return DecimalData.fromBigDecimal(decimalVal, decimalVal.precision(),
decimalVal.scale());
+ }
+ if (value instanceof Timestamp) {
+ return TimestampData.fromTimestamp((Timestamp) value);
+ }
+ if (value instanceof LocalDate) {
+ return (int)(((LocalDate) value).toEpochDay());
+ }
+ if (value instanceof ByteBuffer) {
+ return ((ByteBuffer) value).array();
+ }
+ return value;
+ }
+
/**
* Returns the precision of the given TIMESTAMP type.
*/
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index dc2ad7f0b15..8c26ddf559c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -22,7 +22,6 @@ package org.apache.hudi;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -77,15 +76,12 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
return Option.of(new OverwriteWithLatestSparkRecordMerger());
case CUSTOM:
default:
- if
(mergeStrategyId.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
- return Option.of(HoodieAvroRecordMerger.INSTANCE);
- }
- Option<HoodieRecordMerger> mergerClass =
HoodieRecordUtils.createValidRecordMerger(EngineType.SPARK, mergeImplClasses,
mergeStrategyId);
- if (mergerClass.isEmpty()) {
+ Option<HoodieRecordMerger> recordMerger =
HoodieRecordUtils.createValidRecordMerger(EngineType.SPARK, mergeImplClasses,
mergeStrategyId);
+ if (recordMerger.isEmpty()) {
throw new IllegalArgumentException("No valid spark merger
implementation set for `"
+ RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
}
- return mergerClass;
+ return recordMerger;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d2f360e5ae1..811a7020deb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -106,15 +106,12 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
return Option.of(new OverwriteWithLatestMerger());
case CUSTOM:
default:
- if
(mergeStrategyId.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
- return Option.of(HoodieAvroRecordMerger.INSTANCE);
- }
- Option<HoodieRecordMerger> mergerClass =
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses,
mergeStrategyId);
- if (mergerClass.isEmpty()) {
+ Option<HoodieRecordMerger> recordMerger =
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses,
mergeStrategyId);
+ if (recordMerger.isEmpty()) {
throw new IllegalArgumentException("No valid merger implementation
set for `"
+ RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
}
- return mergerClass;
+ return recordMerger;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 0e3ae6bea5c..4cc59c2927f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -86,6 +86,9 @@ public class HoodieRecordUtils {
public static Option<HoodieRecordMerger> createValidRecordMerger(EngineType
engineType,
String
mergerImpls, String recordMergerStrategy) {
+ if
(recordMergerStrategy.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID))
{
+ return Option.of(HoodieAvroRecordMerger.INSTANCE);
+ }
return
createValidRecordMerger(engineType,ConfigUtils.split2List(mergerImpls),
recordMergerStrategy);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 6135b21e950..577508e131b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -262,7 +262,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
}
- private Map<String, String> getCommonConfigs(RecordMergeMode
recordMergeMode, boolean populateMetaFields) {
+ protected Map<String, String> getCommonConfigs(RecordMergeMode
recordMergeMode, boolean populateMetaFields) {
Map<String, String> configMapping = new HashMap<>();
configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
KEY_FIELD_NAME);
configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
PARTITION_FIELD_NAME);
@@ -283,7 +283,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return configMapping;
}
- private void validateOutputFromFileGroupReader(StorageConfiguration<?>
storageConf,
+ protected void validateOutputFromFileGroupReader(StorageConfiguration<?>
storageConf,
String tablePath,
boolean containsBaseFile,
int expectedLogFileNum,
@@ -422,10 +422,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
fileSlice.getTotalFileSize(),
false,
false)) {
- fileGroupReader.initRecordIterators();
- while (fileGroupReader.hasNext()) {
- actualRecordList.add(fileGroupReader.next());
- }
+ readWithFileGroupReader(fileGroupReader, actualRecordList, avroSchema);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -433,6 +430,16 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return actualRecordList;
}
+ protected void readWithFileGroupReader(
+ HoodieFileGroupReader<T> fileGroupReader,
+ List<T> recordList,
+ Schema recordSchema) throws IOException {
+ fileGroupReader.initRecordIterators();
+ while (fileGroupReader.hasNext()) {
+ recordList.add(fileGroupReader.next());
+ }
+ }
+
private boolean shouldValidatePartialRead(FileSlice fileSlice, Schema
requestedSchema) {
if (fileSlice.getLogFiles().findAny().isPresent()) {
return true;
@@ -445,7 +452,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return false;
}
- private List<HoodieRecord> mergeRecordLists(List<HoodieRecord> updates,
List<HoodieRecord> existing) {
+ protected List<HoodieRecord> mergeRecordLists(List<HoodieRecord> updates,
List<HoodieRecord> existing) {
Set<String> updatedKeys =
updates.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
return Stream.concat(updates.stream(), existing.stream().filter(record ->
!updatedKeys.contains(record.getRecordKey())))
.collect(Collectors.toList());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index bfddb031dae..1c93f4579fb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -49,6 +49,7 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataUtils;
import org.apache.hudi.util.RowProjection;
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
@@ -59,6 +60,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
@@ -109,7 +111,8 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
(HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
.getFileReader(hoodieConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
- return rowDataParquetReader.getRowDataIterator(schemaManager,
requiredSchema);
+ DataType rowType =
RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
+ return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema);
}
@Override
@@ -120,13 +123,12 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
case COMMIT_TIME_ORDERING:
return Option.of(CommitTimeFlinkRecordMerger.INSTANCE);
default:
- Option<HoodieRecordMerger> mergerClass =
- HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK,
mergeImplClasses, mergeStrategyId);
- if (mergerClass.isEmpty()) {
+ Option<HoodieRecordMerger> recordMerger =
HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK, mergeImplClasses,
mergeStrategyId);
+ if (recordMerger.isEmpty()) {
throw new HoodieValidationException("No valid flink merger
implementation set for `"
+ RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
}
- return mergerClass;
+ return recordMerger;
}
}
@@ -158,7 +160,7 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
RowData record,
Schema schema,
Option<String> orderingFieldName) {
- if (orderingFieldName.isEmpty()) {
+ if (orderingFieldName.isEmpty() ||
schema.getField(orderingFieldName.get()) == null) {
return DEFAULT_ORDERING_VALUE;
}
RowDataAvroQueryContexts.FieldQueryContext context =
RowDataAvroQueryContexts.fromAvroSchema(schema,
utcTimezone).getFieldQueryContext(orderingFieldName.get());
@@ -250,7 +252,7 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
@Override
public GenericRecord convertToAvroRecord(RowData record, Schema schema) {
- throw new UnsupportedOperationException("FlinkRowDataReaderContext do not
support convertToAvroRecord yet.");
+ return (GenericRecord)
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowDataToAvroConverter().convert(schema,
record);
}
@Override
@@ -259,4 +261,9 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
AvroToRowDataConverters.AvroToRowDataConverter converter =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema,
utcTimezone).getAvroToRowDataConverter();
return (RowData) converter.convert(avroRecord);
}
+
+ @Override
+ public Comparable convertValueToEngineType(Comparable value) {
+ return (Comparable) RowDataUtils.convertValueToFlinkType(value);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index cd18cf49273..2902aea8089 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -81,7 +81,7 @@ public class HoodieRowDataParquetReader implements
HoodieFileReader<RowData> {
@Override
public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
- ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, requestedSchema);
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema);
readerIterators.add(rowDataItr);
return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
}
@@ -89,12 +89,12 @@ public class HoodieRowDataParquetReader implements
HoodieFileReader<RowData> {
@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
Schema schema = HoodieAvroUtils.getRecordKeySchema();
- ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, schema);
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema);
return new CloseableMappingIterator<>(rowDataItr, rowData ->
Objects.toString(rowData.getString(0)));
}
- public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager
internalSchemaManager, Schema requestedSchema) throws IOException {
- return RecordIterators.getParquetRecordIterator(storage.getConf(),
internalSchemaManager, getRowType(), requestedSchema, path);
+ public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager
internalSchemaManager, DataType dataType, Schema requestedSchema) throws
IOException {
+ return RecordIterators.getParquetRecordIterator(storage.getConf(),
internalSchemaManager, dataType, requestedSchema, path);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
new file mode 100644
index 00000000000..f626c2d33bc
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.CustomPayloadForTesting;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderBase;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.table.format.FlinkRowDataReaderContext;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@code HoodieFileGroupReader} with {@code FlinkRowDataReaderContext}
on Flink.
+ */
+public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderBase<RowData> {
+ private Configuration conf;
+ private static final Schema RECORD_SCHEMA = getRecordAvroSchema();
+ private static final AvroToRowDataConverters.AvroToRowDataConverter
AVRO_CONVERTER =
+
RowDataAvroQueryContexts.fromAvroSchema(RECORD_SCHEMA).getAvroToRowDataConverter();
+
+ @BeforeEach
+ public void setup() {
+ conf = new Configuration();
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, RECORD_SCHEMA.toString());
+ conf.set(FlinkOptions.PATH, getBasePath());
+ conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
+ // use hive style partition as a workaround for HUDI-9396
+ conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
+ }
+
+ @Override
+ public StorageConfiguration<?> getStorageConf() {
+ return new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+ }
+
+ @Override
+ public String getBasePath() {
+ return tempDir.toAbsolutePath().toUri().toString();
+ }
+
+ @Override
+ public HoodieReaderContext<RowData> getHoodieReaderContext(
+ String tablePath,
+ Schema avroSchema,
+ StorageConfiguration<?> storageConf,
+ HoodieTableMetaClient metaClient) {
+ return new FlinkRowDataReaderContext(
+ storageConf,
+ () -> InternalSchemaManager.DISABLED,
+ Collections.emptyList(),
+ metaClient.getTableConfig());
+ }
+
+ @Override
+ public String getCustomPayload() {
+ return CustomPayloadForTesting.class.getName();
+ }
+
+ @Override
+ protected void readWithFileGroupReader(
+ HoodieFileGroupReader<RowData> fileGroupReader,
+ List<RowData> recordList,
+ Schema recordSchema) throws IOException {
+ RowDataSerializer rowDataSerializer =
RowDataAvroQueryContexts.getRowDataSerializer(recordSchema);
+ fileGroupReader.initRecordIterators();
+ while (fileGroupReader.hasNext()) {
+ RowData rowData = rowDataSerializer.copy(fileGroupReader.next());
+ recordList.add(rowData);
+ }
+ }
+
+ @Override
+ public void commitToTable(List<HoodieRecord> recordList, String operation,
Map<String, String> writeConfigs) {
+ writeConfigs.forEach((key, value) -> conf.setString(key, value));
+ conf.set(FlinkOptions.OPERATION, operation);
+ List<RowData> rowDataList = recordList.stream().map(record -> {
+ try {
+ return (RowData)
AVRO_CONVERTER.convert(record.toIndexedRecord(RECORD_SCHEMA,
CollectionUtils.emptyProps()).get().getData());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+
+ try {
+ TestData.writeData(rowDataList, conf);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to insert data", e);
+ }
+ }
+
+ @Override
+ public void assertRecordsEqual(Schema schema, RowData expected, RowData
actual) {
+ TestData.assertRowDataEquals(
+ Collections.singletonList(actual),
+ Collections.singletonList(expected),
+ RowDataAvroQueryContexts.fromAvroSchema(schema).getRowType());
+ }
+
+ @Test
+ public void testGetOrderingValue() {
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ when(tableConfig.populateMetaFields()).thenReturn(true);
+ FlinkRowDataReaderContext readerContext =
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ Schema schema = SchemaBuilder.builder()
+ .record("test")
+ .fields()
+ .requiredString("field1")
+ .optionalString("field2")
+ .optionalLong("ts")
+ .endRecord();
+ GenericRowData rowData = GenericRowData.of(StringData.fromString("f1"),
StringData.fromString("f2"), 1000L);
+ assertEquals(1000L, readerContext.getOrderingValue(rowData, schema,
Option.of("ts")));
+ assertEquals(DEFAULT_ORDERING_VALUE,
readerContext.getOrderingValue(rowData, schema, Option.of("non_existent_col")));
+ }
+
+ @Test
+ public void getRecordKeyFromMetadataFields() {
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ when(tableConfig.populateMetaFields()).thenReturn(true);
+ FlinkRowDataReaderContext readerContext =
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ Schema schema = SchemaBuilder.builder()
+ .record("test")
+ .fields()
+ .requiredString(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+ .optionalString("field2")
+ .endRecord();
+ String key = "my_key";
+ GenericRowData rowData = GenericRowData.of(StringData.fromString(key),
StringData.fromString("field2_val"));
+ assertEquals(key, readerContext.getRecordKey(rowData, schema));
+ }
+
+ @Test
+ public void getRecordKeySingleKey() {
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ when(tableConfig.populateMetaFields()).thenReturn(false);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"field1"}));
+ FlinkRowDataReaderContext readerContext =
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ Schema schema = SchemaBuilder.builder()
+ .record("test")
+ .fields()
+ .requiredString("field1")
+ .optionalString("field2")
+ .endRecord();
+ String key = "key";
+ GenericRowData rowData = GenericRowData.of(StringData.fromString(key),
StringData.fromString("other"));
+ assertEquals(key, readerContext.getRecordKey(rowData, schema));
+ }
+
+ @Test
+ public void getRecordKeyWithMultipleKeys() {
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ when(tableConfig.populateMetaFields()).thenReturn(false);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"field1", "field2"}));
+ FlinkRowDataReaderContext readerContext =
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+
+ Schema schema = SchemaBuilder.builder()
+ .record("test")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .requiredString("field3")
+ .endRecord();
+ String key = "field1:va1,field2:__empty__";
+ GenericRowData rowData = GenericRowData.of(StringData.fromString("va1"),
StringData.fromString(""), StringData.fromString("other"));
+ assertEquals(key, readerContext.getRecordKey(rowData, schema));
+ }
+
+ @ParameterizedTest
+ @MethodSource("logFileOnlyCases")
+ public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode, true));
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
+ // One commit; reading one file group containing a log file only
+ List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
+ commitToTable(initialRecords, UPSERT.value(), writeConfigs);
+ validateOutputFromFileGroupReader(
+ getStorageConf(), getBasePath(), false, 1, recordMergeMode,
+ initialRecords, initialRecords);
+
+ // Two commits; reading one file group containing two log files
+ List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
+ List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
+ commitToTable(updates, UPSERT.value(), writeConfigs);
+ validateOutputFromFileGroupReader(
+ getStorageConf(), getBasePath(), false, 2, recordMergeMode,
+ allRecords, CollectionUtils.combine(initialRecords, updates));
+ }
+ }
+
+ private static Schema getRecordAvroSchema() {
+ Schema recordSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+ return
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index a9830b2ca60..e5440c4e5c5 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -23,7 +23,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -189,15 +188,12 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
return Option.of(new OverwriteWithLatestHiveRecordMerger());
case CUSTOM:
default:
- if
(mergeStrategyId.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
- return Option.of(HoodieAvroRecordMerger.INSTANCE);
- }
- Option<HoodieRecordMerger> mergerClass =
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses,
mergeStrategyId);
- if (mergerClass.isEmpty()) {
+ Option<HoodieRecordMerger> recordMerger =
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses,
mergeStrategyId);
+ if (recordMerger.isEmpty()) {
throw new IllegalArgumentException("No valid hive merger
implementation set for `"
+ RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
}
- return mergerClass;
+ return recordMerger;
}
}