This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b4cb00ad4bb [HUDI-9378] Add tests for Flink file group reader (#13274)
b4cb00ad4bb is described below

commit b4cb00ad4bb692824528cf354066fc6ae64f9a92
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri May 9 14:03:03 2025 +0800

    [HUDI-9378] Add tests for Flink file group reader (#13274)
---
 .../apache/hudi/client/model/BootstrapRowData.java |   4 +-
 .../java/org/apache/hudi/util/RowDataUtils.java    |  33 +++
 .../hudi/BaseSparkInternalRowReaderContext.java    |  10 +-
 .../apache/hudi/avro/HoodieAvroReaderContext.java  |   9 +-
 .../apache/hudi/common/util/HoodieRecordUtils.java |   3 +
 .../table/read/TestHoodieFileGroupReaderBase.java  |  21 +-
 .../table/format/FlinkRowDataReaderContext.java    |  21 +-
 .../table/format/HoodieRowDataParquetReader.java   |   8 +-
 .../table/TestHoodieFileGroupReaderOnFlink.java    | 261 +++++++++++++++++++++
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  10 +-
 10 files changed, 340 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
index 513b8b1994b..97d68e58018 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.function.Function;
 
 /**
- * RowData implementation used when reading from Spark bootstrapped table. In 
these tables, the partition values
+ * RowData implementation used when reading from Flink bootstrapped table. In 
these tables, the partition values
  * are not always written to the data files, so we need to use the values 
inferred from the file's partition path.
  */
 public class BootstrapRowData implements RowData {
@@ -60,7 +60,7 @@ public class BootstrapRowData implements RowData {
 
   @Override
   public boolean isNullAt(int pos) {
-    return !partitionOrdinalToValues.containsKey(pos) || row.isNullAt(pos);
+    return !partitionOrdinalToValues.containsKey(pos) && row.isNullAt(pos);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
index dc6d2c72f56..d1de02bfc30 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataUtils.java
@@ -20,12 +20,15 @@ package org.apache.hudi.util;
 
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.util.function.Function;
@@ -102,6 +105,36 @@ public class RowDataUtils {
     }
   }
 
+  /**
+   * Convert the native Java object to the corresponding value of Flink type.
+   *
+   * @param value Java object
+   *
+   * @return Value of Flink type
+   */
+  public static Object convertValueToFlinkType(Object value) {
+    if (value == null) {
+      return null;
+    }
+    if (value instanceof String) {
+      return StringData.fromString((String) value);
+    }
+    if (value instanceof BigDecimal) {
+      BigDecimal decimalVal = (BigDecimal) value;
+      return DecimalData.fromBigDecimal(decimalVal, decimalVal.precision(), 
decimalVal.scale());
+    }
+    if (value instanceof Timestamp) {
+      return TimestampData.fromTimestamp((Timestamp) value);
+    }
+    if (value instanceof LocalDate) {
+      return (int)(((LocalDate) value).toEpochDay());
+    }
+    if (value instanceof ByteBuffer) {
+      return ((ByteBuffer) value).array();
+    }
+    return value;
+  }
+
   /**
    * Returns the precision of the given TIMESTAMP type.
    */
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index dc2ad7f0b15..8c26ddf559c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -22,7 +22,6 @@ package org.apache.hudi;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -77,15 +76,12 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
         return Option.of(new OverwriteWithLatestSparkRecordMerger());
       case CUSTOM:
       default:
-        if 
(mergeStrategyId.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
-          return Option.of(HoodieAvroRecordMerger.INSTANCE);
-        }
-        Option<HoodieRecordMerger> mergerClass = 
HoodieRecordUtils.createValidRecordMerger(EngineType.SPARK, mergeImplClasses, 
mergeStrategyId);
-        if (mergerClass.isEmpty()) {
+        Option<HoodieRecordMerger> recordMerger = 
HoodieRecordUtils.createValidRecordMerger(EngineType.SPARK, mergeImplClasses, 
mergeStrategyId);
+        if (recordMerger.isEmpty()) {
           throw new IllegalArgumentException("No valid spark merger 
implementation set for `"
               + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
         }
-        return mergerClass;
+        return recordMerger;
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d2f360e5ae1..811a7020deb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -106,15 +106,12 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
         return Option.of(new OverwriteWithLatestMerger());
       case CUSTOM:
       default:
-        if 
(mergeStrategyId.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
-          return Option.of(HoodieAvroRecordMerger.INSTANCE);
-        }
-        Option<HoodieRecordMerger> mergerClass = 
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses, 
mergeStrategyId);
-        if (mergerClass.isEmpty()) {
+        Option<HoodieRecordMerger> recordMerger = 
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses, 
mergeStrategyId);
+        if (recordMerger.isEmpty()) {
           throw new IllegalArgumentException("No valid merger implementation 
set for `"
               + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
         }
-        return mergerClass;
+        return recordMerger;
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 0e3ae6bea5c..4cc59c2927f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -86,6 +86,9 @@ public class HoodieRecordUtils {
 
   public static Option<HoodieRecordMerger> createValidRecordMerger(EngineType 
engineType,
                                                                    String 
mergerImpls, String recordMergerStrategy) {
+    if 
(recordMergerStrategy.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID))
 {
+      return Option.of(HoodieAvroRecordMerger.INSTANCE);
+    }
     return 
createValidRecordMerger(engineType,ConfigUtils.split2List(mergerImpls), 
recordMergerStrategy);
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 6135b21e950..577508e131b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -262,7 +262,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     }
   }
 
-  private Map<String, String> getCommonConfigs(RecordMergeMode 
recordMergeMode, boolean populateMetaFields) {
+  protected Map<String, String> getCommonConfigs(RecordMergeMode 
recordMergeMode, boolean populateMetaFields) {
     Map<String, String> configMapping = new HashMap<>();
     configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
KEY_FIELD_NAME);
     configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
PARTITION_FIELD_NAME);
@@ -283,7 +283,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return configMapping;
   }
 
-  private void validateOutputFromFileGroupReader(StorageConfiguration<?> 
storageConf,
+  protected void validateOutputFromFileGroupReader(StorageConfiguration<?> 
storageConf,
                                                  String tablePath,
                                                  boolean containsBaseFile,
                                                  int expectedLogFileNum,
@@ -422,10 +422,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
           fileSlice.getTotalFileSize(),
           false,
           false)) {
-        fileGroupReader.initRecordIterators();
-        while (fileGroupReader.hasNext()) {
-          actualRecordList.add(fileGroupReader.next());
-        }
+        readWithFileGroupReader(fileGroupReader, actualRecordList, avroSchema);
       } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
@@ -433,6 +430,16 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return actualRecordList;
   }
 
+  protected void readWithFileGroupReader(
+      HoodieFileGroupReader<T> fileGroupReader,
+      List<T> recordList,
+      Schema recordSchema) throws IOException {
+    fileGroupReader.initRecordIterators();
+    while (fileGroupReader.hasNext()) {
+      recordList.add(fileGroupReader.next());
+    }
+  }
+
   private boolean shouldValidatePartialRead(FileSlice fileSlice, Schema 
requestedSchema) {
     if (fileSlice.getLogFiles().findAny().isPresent()) {
       return true;
@@ -445,7 +452,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return false;
   }
 
-  private List<HoodieRecord> mergeRecordLists(List<HoodieRecord> updates, 
List<HoodieRecord> existing) {
+  protected List<HoodieRecord> mergeRecordLists(List<HoodieRecord> updates, 
List<HoodieRecord> existing) {
     Set<String> updatedKeys = 
updates.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
     return Stream.concat(updates.stream(), existing.stream().filter(record -> 
!updatedKeys.contains(record.getRecordKey())))
             .collect(Collectors.toList());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index bfddb031dae..1c93f4579fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -49,6 +49,7 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataUtils;
 import org.apache.hudi.util.RowProjection;
 import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
 
@@ -59,6 +60,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
@@ -109,7 +111,8 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
         (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
             .getFileReader(hoodieConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
-    return rowDataParquetReader.getRowDataIterator(schemaManager, 
requiredSchema);
+    DataType rowType = 
RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
+    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema);
   }
 
   @Override
@@ -120,13 +123,12 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
       case COMMIT_TIME_ORDERING:
         return Option.of(CommitTimeFlinkRecordMerger.INSTANCE);
       default:
-        Option<HoodieRecordMerger> mergerClass =
-            HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK, 
mergeImplClasses, mergeStrategyId);
-        if (mergerClass.isEmpty()) {
+        Option<HoodieRecordMerger> recordMerger = 
HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK, mergeImplClasses, 
mergeStrategyId);
+        if (recordMerger.isEmpty()) {
           throw new HoodieValidationException("No valid flink merger 
implementation set for `"
               + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
         }
-        return mergerClass;
+        return recordMerger;
     }
   }
 
@@ -158,7 +160,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
       RowData record,
       Schema schema,
       Option<String> orderingFieldName) {
-    if (orderingFieldName.isEmpty()) {
+    if (orderingFieldName.isEmpty() || 
schema.getField(orderingFieldName.get()) == null) {
       return DEFAULT_ORDERING_VALUE;
     }
     RowDataAvroQueryContexts.FieldQueryContext context = 
RowDataAvroQueryContexts.fromAvroSchema(schema, 
utcTimezone).getFieldQueryContext(orderingFieldName.get());
@@ -250,7 +252,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
 
   @Override
   public GenericRecord convertToAvroRecord(RowData record, Schema schema) {
-    throw new UnsupportedOperationException("FlinkRowDataReaderContext do not 
support convertToAvroRecord yet.");
+    return (GenericRecord) 
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowDataToAvroConverter().convert(schema,
 record);
   }
 
   @Override
@@ -259,4 +261,9 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
     AvroToRowDataConverters.AvroToRowDataConverter converter = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, 
utcTimezone).getAvroToRowDataConverter();
     return (RowData) converter.convert(avroRecord);
   }
+
+  @Override
+  public Comparable convertValueToEngineType(Comparable value) {
+    return (Comparable) RowDataUtils.convertValueToFlinkType(value);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index cd18cf49273..2902aea8089 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -81,7 +81,7 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
 
   @Override
   public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, requestedSchema);
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema);
     readerIterators.add(rowDataItr);
     return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
   }
@@ -89,12 +89,12 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
     Schema schema = HoodieAvroUtils.getRecordKeySchema();
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, schema);
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema);
     return new CloseableMappingIterator<>(rowDataItr, rowData -> 
Objects.toString(rowData.getString(0)));
   }
 
-  public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager 
internalSchemaManager, Schema requestedSchema) throws IOException {
-    return RecordIterators.getParquetRecordIterator(storage.getConf(), 
internalSchemaManager, getRowType(), requestedSchema, path);
+  public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager 
internalSchemaManager, DataType dataType, Schema requestedSchema) throws 
IOException {
+    return RecordIterators.getParquetRecordIterator(storage.getConf(), 
internalSchemaManager, dataType, requestedSchema, path);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
new file mode 100644
index 00000000000..f626c2d33bc
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.CustomPayloadForTesting;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderBase;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.table.format.FlinkRowDataReaderContext;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@code HoodieFileGroupReader} with {@code FlinkRowDataReaderContext} 
on Flink.
+ */
+public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderBase<RowData> {
+  private Configuration conf;
+  private static final Schema RECORD_SCHEMA = getRecordAvroSchema();
+  private static final AvroToRowDataConverters.AvroToRowDataConverter 
AVRO_CONVERTER =
+      
RowDataAvroQueryContexts.fromAvroSchema(RECORD_SCHEMA).getAvroToRowDataConverter();
+
+  @BeforeEach
+  public void setup() {
+    conf = new Configuration();
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, RECORD_SCHEMA.toString());
+    conf.set(FlinkOptions.PATH, getBasePath());
+    conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
+    // use hive style partition as a workaround for HUDI-9396
+    conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
+  }
+
+  @Override
+  public StorageConfiguration<?> getStorageConf() {
+    return new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+  }
+
+  @Override
+  public String getBasePath() {
+    return tempDir.toAbsolutePath().toUri().toString();
+  }
+
+  @Override
+  public HoodieReaderContext<RowData> getHoodieReaderContext(
+      String tablePath,
+      Schema avroSchema,
+      StorageConfiguration<?> storageConf,
+      HoodieTableMetaClient metaClient) {
+    return new FlinkRowDataReaderContext(
+        storageConf,
+        () -> InternalSchemaManager.DISABLED,
+        Collections.emptyList(),
+        metaClient.getTableConfig());
+  }
+
+  @Override
+  public String getCustomPayload() {
+    return CustomPayloadForTesting.class.getName();
+  }
+
+  @Override
+  protected void readWithFileGroupReader(
+      HoodieFileGroupReader<RowData> fileGroupReader,
+      List<RowData> recordList,
+      Schema recordSchema) throws IOException {
+    RowDataSerializer rowDataSerializer = 
RowDataAvroQueryContexts.getRowDataSerializer(recordSchema);
+    fileGroupReader.initRecordIterators();
+    while (fileGroupReader.hasNext()) {
+      RowData rowData = rowDataSerializer.copy(fileGroupReader.next());
+      recordList.add(rowData);
+    }
+  }
+
+  @Override
+  public void commitToTable(List<HoodieRecord> recordList, String operation, 
Map<String, String> writeConfigs) {
+    writeConfigs.forEach((key, value) -> conf.setString(key, value));
+    conf.set(FlinkOptions.OPERATION, operation);
+    List<RowData> rowDataList = recordList.stream().map(record -> {
+      try {
+        return (RowData) 
AVRO_CONVERTER.convert(record.toIndexedRecord(RECORD_SCHEMA, 
CollectionUtils.emptyProps()).get().getData());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+
+    try {
+      TestData.writeData(rowDataList, conf);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to insert data", e);
+    }
+  }
+
+  @Override
+  public void assertRecordsEqual(Schema schema, RowData expected, RowData 
actual) {
+    TestData.assertRowDataEquals(
+        Collections.singletonList(actual),
+        Collections.singletonList(expected),
+        RowDataAvroQueryContexts.fromAvroSchema(schema).getRowType());
+  }
+
+  @Test
+  public void testGetOrderingValue() {
+    HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+    when(tableConfig.populateMetaFields()).thenReturn(true);
+    FlinkRowDataReaderContext readerContext =
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+    Schema schema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .requiredString("field1")
+        .optionalString("field2")
+        .optionalLong("ts")
+        .endRecord();
+    GenericRowData rowData = GenericRowData.of(StringData.fromString("f1"), 
StringData.fromString("f2"), 1000L);
+    assertEquals(1000L, readerContext.getOrderingValue(rowData, schema, 
Option.of("ts")));
+    assertEquals(DEFAULT_ORDERING_VALUE, 
readerContext.getOrderingValue(rowData, schema, Option.of("non_existent_col")));
+  }
+
+  @Test
+  public void getRecordKeyFromMetadataFields() {
+    HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+    when(tableConfig.populateMetaFields()).thenReturn(true);
+    FlinkRowDataReaderContext readerContext =
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+    Schema schema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .requiredString(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+        .optionalString("field2")
+        .endRecord();
+    String key = "my_key";
+    GenericRowData rowData = GenericRowData.of(StringData.fromString(key), 
StringData.fromString("field2_val"));
+    assertEquals(key, readerContext.getRecordKey(rowData, schema));
+  }
+
+  @Test
+  public void getRecordKeySingleKey() {
+    HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+    when(tableConfig.populateMetaFields()).thenReturn(false);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"field1"}));
+    FlinkRowDataReaderContext readerContext =
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+    Schema schema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .requiredString("field1")
+        .optionalString("field2")
+        .endRecord();
+    String key = "key";
+    GenericRowData rowData = GenericRowData.of(StringData.fromString(key), 
StringData.fromString("other"));
+    assertEquals(key, readerContext.getRecordKey(rowData, schema));
+  }
+
+  @Test
+  public void getRecordKeyWithMultipleKeys() {
+    HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+    when(tableConfig.populateMetaFields()).thenReturn(false);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"field1", "field2"}));
+    FlinkRowDataReaderContext readerContext =
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+
+    Schema schema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .requiredString("field1")
+        .requiredString("field2")
+        .requiredString("field3")
+        .endRecord();
+    String key = "field1:va1,field2:__empty__";
+    GenericRowData rowData = GenericRowData.of(StringData.fromString("va1"), 
StringData.fromString(""), StringData.fromString("other"));
+    assertEquals(key, readerContext.getRecordKey(rowData, schema));
+  }
+
+  @ParameterizedTest
+  @MethodSource("logFileOnlyCases")
+  public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode 
recordMergeMode, String logDataBlockFormat) throws Exception {
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode, true));
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
+      // One commit; reading one file group containing a log file only
+      List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
+      commitToTable(initialRecords, UPSERT.value(), writeConfigs);
+      validateOutputFromFileGroupReader(
+          getStorageConf(), getBasePath(), false, 1, recordMergeMode,
+          initialRecords, initialRecords);
+
+      // Two commits; reading one file group containing two log files
+      List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
+      List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
+      commitToTable(updates, UPSERT.value(), writeConfigs);
+      validateOutputFromFileGroupReader(
+          getStorageConf(), getBasePath(), false, 2, recordMergeMode,
+          allRecords, CollectionUtils.combine(initialRecords, updates));
+    }
+  }
+
+  private static Schema getRecordAvroSchema() {
+    Schema recordSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+    return 
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index a9830b2ca60..e5440c4e5c5 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -23,7 +23,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -189,15 +188,12 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
         return Option.of(new OverwriteWithLatestHiveRecordMerger());
       case CUSTOM:
       default:
-        if 
(mergeStrategyId.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
-          return Option.of(HoodieAvroRecordMerger.INSTANCE);
-        }
-        Option<HoodieRecordMerger> mergerClass = 
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses, 
mergeStrategyId);
-        if (mergerClass.isEmpty()) {
+        Option<HoodieRecordMerger> recordMerger = 
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergeImplClasses, 
mergeStrategyId);
+        if (recordMerger.isEmpty()) {
           throw new IllegalArgumentException("No valid hive merger 
implementation set for `"
               + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
         }
-        return mergerClass;
+        return recordMerger;
     }
   }
 

Reply via email to