This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21275279f58 [HUDI-8659] Remove ENUM type check and add other fixes for 
file group reader-based compaction (#12427)
21275279f58 is described below

commit 21275279f58d75b951fd7c1321aed4b71f287c3e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Dec 5 04:35:29 2024 -0800

    [HUDI-8659] Remove ENUM type check and add other fixes for file group 
reader-based compaction (#12427)
    
    * [HUDI-8659] Remove ENUM type check for file group reader
    
    * Fix validation in GenericRecordValidationTestUtils
    
    * Disable file group reader for log compaction
    
    * Add log read time to the runtime stat in the write stat for the new merge 
handle
    
    * Add more stats
    
    * Fix testRecordGenerationAPIsForMOR
---
 .../hudi/table/action/compact/HoodieCompactor.java |   9 +-
 .../compact/RunCompactionActionExecutor.java       |   2 +-
 .../GenericRecordValidationTestUtils.java          |   4 +
 ...HoodieSparkFileGroupReaderBasedMergeHandle.java |  33 +++++--
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  28 ------
 .../apache/hudi/common/model/HoodieWriteStat.java  | 100 +--------------------
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  18 ++--
 .../common/table/read/HoodieFileGroupReader.java   |  46 +++++++---
 .../table/read/HoodieFileGroupRecordBuffer.java    |   5 --
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |   5 +-
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |   5 +-
 .../hudi/common/table/read/HoodieReadStats.java    |  80 +++++++++++++++++
 .../read/HoodieUnmergedFileGroupRecordBuffer.java  |   5 +-
 ...stHoodiePositionBasedFileGroupRecordBuffer.java |   6 +-
 .../TestMetadataUtilRLIandSIRecordGeneration.java  |   6 +-
 15 files changed, 176 insertions(+), 176 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 96f9316902d..88d90a850cf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -102,7 +102,8 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
    * Execute compaction operations and report back status.
    */
   public HoodieData<WriteStatus> compact(
-      HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+      HoodieEngineContext context, WriteOperationType operationType,
+      HoodieCompactionPlan compactionPlan,
       HoodieTable table, HoodieWriteConfig config, String 
compactionInstantTime,
       HoodieCompactionHandler compactionHandler) {
     if (compactionPlan == null || (compactionPlan.getOperations() == null)
@@ -145,9 +146,9 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     boolean useFileGroupReaderBasedCompaction = 
context.supportsFileGroupReader()   // the engine needs to support fg reader 
first
         && !metaClient.isMetadataTable()
         && 
config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+        && operationType == WriteOperationType.COMPACT
         && !hasBootstrapFile(operations)                                       
     // bootstrap file read for fg reader is not ready
         && StringUtils.isNullOrEmpty(config.getInternalSchema())               
     // schema evolution support for fg reader is not ready
-        && !containsUnsupportedTypesForFileGroupReader(config.getSchema())     
     // Enum type support by fg reader is not ready
         && config.populateMetaFields();                                        
     // Virtual key support by fg reader is not ready
 
     if (useFileGroupReaderBasedCompaction) {
@@ -325,8 +326,4 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
   private boolean hasBootstrapFile(List<CompactionOperation> operationList) {
     return operationList.stream().anyMatch(operation -> 
operation.getBootstrapFilePath().isPresent());
   }
-
-  private boolean containsUnsupportedTypesForFileGroupReader(String schemaStr) 
{
-    return HoodieAvroUtils.containsUnsupportedTypesForFileGroupReader(new 
Schema.Parser().parse(schemaStr));
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index 066776e1743..276bec1b9bb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -106,7 +106,7 @@ public class RunCompactionActionExecutor<T> extends
       }
 
       HoodieData<WriteStatus> statuses = compactor.compact(
-          context, compactionPlan, table, configCopy, instantTime, 
compactionHandler);
+          context, operationType, compactionPlan, table, configCopy, 
instantTime, compactionHandler);
 
       compactor.maybePersist(statuses, context, config, instantTime);
       context.setJobStatus(this.getClass().getSimpleName(), "Preparing 
compaction metadata: " + config.getTableName());
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 1b0ab98ad03..67d189ed1b1 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -81,6 +81,10 @@ public class GenericRecordValidationTestUtils {
               
HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) value2));
         } else if (value1 instanceof Text && value2 instanceof BytesWritable) {
           assertArrayEquals(((Text) value1).getBytes(), ((BytesWritable) 
value2).getBytes());
+        } else if (f.schema().getType() == Schema.Type.ENUM
+            && value1 instanceof BytesWritable && value2 instanceof Text) {
+          // TODO(HUDI-8660): Revisit ENUM handling in Spark parquet reader 
and writer
+          assertArrayEquals(((BytesWritable) value1).getBytes(), ((Text) 
value2).getBytes());
         } else {
           assertEquals(value1, value2, "Field name " + fieldName + " is not 
same."
               + " Val1: " + value1 + ", Val2:" + value2);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index b7d8d2f80f2..947ce08efee 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
@@ -82,6 +83,7 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, 
K, O> extends Hood
   protected HoodieReaderContext readerContext;
   protected FileSlice fileSlice;
   protected Configuration conf;
+  protected HoodieReadStats readStats;
 
   public HoodieSparkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config, 
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                                     CompactionOperation 
operation, TaskContextSupplier taskContextSupplier,
@@ -225,11 +227,11 @@ public class 
HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends Hood
 
         // The stats of inserts, updates, and deletes are updated once at the 
end
         // These will be set in the write stat when closing the merge handle
-        HoodieReadStats stats = fileGroupReader.getStats();
-        this.insertRecordsWritten = stats.getNumInserts();
-        this.updatedRecordsWritten = stats.getNumUpdates();
-        this.recordsDeleted = stats.getNumDeletes();
-        this.recordsWritten = stats.getNumInserts() + stats.getNumUpdates();
+        this.readStats = fileGroupReader.getStats();
+        this.insertRecordsWritten = readStats.getNumInserts();
+        this.updatedRecordsWritten = readStats.getNumUpdates();
+        this.recordsDeleted = readStats.getNumDeletes();
+        this.recordsWritten = readStats.getNumInserts() + 
readStats.getNumUpdates();
       }
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to compact file slice: " + 
fileSlice, e);
@@ -265,4 +267,25 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, 
I, K, O> extends Hood
   protected void writeIncomingRecords() {
     // no operation.
   }
+
+  @Override
+  public List<WriteStatus> close() {
+    try {
+      super.close();
+      
writeStatus.getStat().setTotalLogReadTimeMs(readStats.getTotalLogReadTimeMs());
+      
writeStatus.getStat().setTotalUpdatedRecordsCompacted(readStats.getTotalUpdatedRecordsCompacted());
+      
writeStatus.getStat().setTotalLogFilesCompacted(readStats.getTotalLogFilesCompacted());
+      writeStatus.getStat().setTotalLogRecords(readStats.getTotalLogRecords());
+      writeStatus.getStat().setTotalLogBlocks(readStats.getTotalLogBlocks());
+      
writeStatus.getStat().setTotalCorruptLogBlock(readStats.getTotalCorruptLogBlock());
+      
writeStatus.getStat().setTotalRollbackBlocks(readStats.getTotalRollbackBlocks());
+
+      if (writeStatus.getStat().getRuntimeStats() != null) {
+        
writeStatus.getStat().getRuntimeStats().setTotalScanTime(readStats.getTotalLogReadTimeMs());
+      }
+      return Collections.singletonList(writeStatus);
+    } catch (Exception e) {
+      throw new HoodieUpsertException("Failed to close 
HoodieSparkFileGroupReaderBasedMergeHandle", e);
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 06380d36419..7e67e41581e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1487,32 +1487,4 @@ public class HoodieAvroUtils {
       throw new UnsupportedOperationException(String.format("Unsupported type 
of the value (%s)", avroValueWrapper.getClass()));
     }
   }
-
-  /**
-   * Returns whether the schema contains types not supported by the file group 
reader.
-   * Right now only ENUM type in Avro has known issues.
-   *
-   * @param schema Avro schema
-   *
-   * @return whether the schema contains types not supported by the file group 
reader.
-   */
-  public static boolean containsUnsupportedTypesForFileGroupReader(Schema 
schema) {
-    switch (schema.getType()) {
-      case RECORD:
-        for (Field field : schema.getFields()) {
-          if (containsUnsupportedTypesForFileGroupReader(field.schema())) {
-            return true;
-          }
-        }
-        return false;
-      case ARRAY:
-        return 
containsUnsupportedTypesForFileGroupReader(schema.getElementType());
-      case MAP:
-        return 
containsUnsupportedTypesForFileGroupReader(schema.getValueType());
-      case UNION:
-        return 
containsUnsupportedTypesForFileGroupReader(getActualSchemaFromUnion(schema, 
null));
-      default:
-        return schema.getType() == Schema.Type.ENUM;
-    }
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 5a415847277..395ba9fdfc2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -91,48 +91,6 @@ public class HoodieWriteStat extends HoodieReadStats {
   @Nullable
   private String partitionPath;
 
-  /**
-   * Total number of log records that were compacted by a compaction operation.
-   */
-  @Nullable
-  private long totalLogRecords;
-
-  /**
-   * Total number of log files compacted for a file slice with this base 
fileid.
-   */
-  @Nullable
-  private long totalLogFilesCompacted;
-
-  /**
-   * Total size of all log files for a file slice with this base fileid.
-   */
-  @Nullable
-  private long totalLogSizeCompacted;
-
-  /**
-   * Total number of records updated by a compaction operation.
-   */
-  @Nullable
-  private long totalUpdatedRecordsCompacted;
-
-  /**
-   * Total number of log blocks seen in a compaction operation.
-   */
-  @Nullable
-  private long totalLogBlocks;
-
-  /**
-   * Total number of corrupt blocks seen in a compaction operation.
-   */
-  @Nullable
-  private long totalCorruptLogBlock;
-
-  /**
-   * Total number of rollback blocks seen in a compaction operation.
-   */
-  @Nullable
-  private long totalRollbackBlocks;
-
   /**
    * File Size as of close.
    */
@@ -241,30 +199,6 @@ public class HoodieWriteStat extends HoodieReadStats {
     this.partitionPath = partitionPath;
   }
 
-  public long getTotalLogRecords() {
-    return totalLogRecords;
-  }
-
-  public void setTotalLogRecords(long totalLogRecords) {
-    this.totalLogRecords = totalLogRecords;
-  }
-
-  public long getTotalLogFilesCompacted() {
-    return totalLogFilesCompacted;
-  }
-
-  public void setTotalLogFilesCompacted(long totalLogFilesCompacted) {
-    this.totalLogFilesCompacted = totalLogFilesCompacted;
-  }
-
-  public long getTotalUpdatedRecordsCompacted() {
-    return totalUpdatedRecordsCompacted;
-  }
-
-  public void setTotalUpdatedRecordsCompacted(long 
totalUpdatedRecordsCompacted) {
-    this.totalUpdatedRecordsCompacted = totalUpdatedRecordsCompacted;
-  }
-
   public void setTempPath(String tempPath) {
     this.tempPath = tempPath;
   }
@@ -272,39 +206,7 @@ public class HoodieWriteStat extends HoodieReadStats {
   public String getTempPath() {
     return this.tempPath;
   }
-
-  public long getTotalLogSizeCompacted() {
-    return totalLogSizeCompacted;
-  }
-
-  public void setTotalLogSizeCompacted(long totalLogSizeCompacted) {
-    this.totalLogSizeCompacted = totalLogSizeCompacted;
-  }
-
-  public long getTotalLogBlocks() {
-    return totalLogBlocks;
-  }
-
-  public void setTotalLogBlocks(long totalLogBlocks) {
-    this.totalLogBlocks = totalLogBlocks;
-  }
-
-  public long getTotalCorruptLogBlock() {
-    return totalCorruptLogBlock;
-  }
-
-  public void setTotalCorruptLogBlock(long totalCorruptLogBlock) {
-    this.totalCorruptLogBlock = totalCorruptLogBlock;
-  }
-
-  public long getTotalRollbackBlocks() {
-    return totalRollbackBlocks;
-  }
-
-  public void setTotalRollbackBlocks(long totalRollbackBlocks) {
-    this.totalRollbackBlocks = totalRollbackBlocks;
-  }
-
+  
   public long getFileSizeInBytes() {
     return fileSizeInBytes;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 96a74860e8a..1f2879a70f9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -90,7 +90,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected final Option<String> payloadClass;
   protected final TypedProperties props;
   protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
-  protected final HoodieReadStats readerStats;
+  protected final HoodieReadStats readStats;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
   protected T nextRecord;
@@ -103,7 +103,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                                          RecordMergeMode recordMergeMode,
                                          Option<String> 
partitionNameOverrideOpt,
                                          Option<String[]> 
partitionPathFieldOpt,
-                                         TypedProperties props) {
+                                         TypedProperties props,
+                                         HoodieReadStats readStats) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.partitionNameOverrideOpt = partitionNameOverrideOpt;
@@ -127,7 +128,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
     boolean isBitCaskDiskMapCompressionEnabled = 
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
         DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
-    this.readerStats = new HoodieReadStats();
+    this.readStats = readStats;
     try {
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -142,11 +143,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     this.baseFileIterator = baseFileIterator;
   }
 
-  @Override
-  public HoodieReadStats getStats() {
-    return readerStats;
-  }
-
   /**
    * This allows hasNext() to be called multiple times without incrementing 
the iterator by more than 1
    * record. It does come with the caveat that hasNext() must be called every 
time before next(). But
@@ -511,18 +507,18 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       if (resultRecord.isPresent()) {
         // Updates
         nextRecord = readerContext.seal(resultRecord.get());
-        readerStats.incrementNumUpdates();
+        readStats.incrementNumUpdates();
         return true;
       } else {
         // Deletes
-        readerStats.incrementNumDeletes();
+        readStats.incrementNumDeletes();
         return false;
       }
     }
 
     // Inserts
     nextRecord = readerContext.seal(baseRecord);
-    readerStats.incrementNumInserts();
+    readStats.incrementNumInserts();
     return true;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 5bac955e22d..003a5ffa3f7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
@@ -80,6 +79,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private final HoodieFileGroupRecordBuffer<T> recordBuffer;
   private ClosableIterator<T> baseFileIterator;
   private final Option<UnaryOperator<T>> outputConverter;
+  private final HoodieReadStats readStats;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                HoodieStorage storage,
@@ -120,22 +120,34 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props)
         : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props));
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
-    this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, 
tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), isSkipMerge, 
shouldUseRecordPosition);
+    this.readStats = new HoodieReadStats();
+    this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
+        tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), 
isSkipMerge,
+        shouldUseRecordPosition, readStats);
   }
 
   /**
    * Initialize correct record buffer
    */
-  private static HoodieFileGroupRecordBuffer 
getRecordBuffer(HoodieReaderContext readerContext, HoodieTableMetaClient 
hoodieTableMetaClient, RecordMergeMode recordMergeMode,
-                                                             TypedProperties 
props, boolean hasNoLogFiles, boolean isSkipMerge, boolean 
shouldUseRecordPosition) {
+  private static HoodieFileGroupRecordBuffer 
getRecordBuffer(HoodieReaderContext readerContext,
+                                                             
HoodieTableMetaClient hoodieTableMetaClient,
+                                                             RecordMergeMode 
recordMergeMode,
+                                                             TypedProperties 
props,
+                                                             boolean 
hasNoLogFiles,
+                                                             boolean 
isSkipMerge,
+                                                             boolean 
shouldUseRecordPosition,
+                                                             HoodieReadStats 
readStats) {
     if (hasNoLogFiles) {
       return null;
     } else if (isSkipMerge) {
-      return new HoodieUnmergedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props);
+      return new HoodieUnmergedFileGroupRecordBuffer<>(
+          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
     } else if (shouldUseRecordPosition) {
-      return new HoodiePositionBasedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props);
+      return new HoodiePositionBasedFileGroupRecordBuffer<>(
+          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
     } else {
-      return new HoodieKeyBasedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props);
+      return new HoodieKeyBasedFileGroupRecordBuffer<>(
+          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
     }
   }
 
@@ -237,10 +249,11 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
   }
 
+  /**
+   * @return statistics of reading a file group.
+   */
   public HoodieReadStats getStats() {
-    ValidationUtils.checkArgument(recordBuffer != null,
-        "Only support getting reader stats from log merging now");
-    return recordBuffer.getStats();
+    return readStats;
   }
 
   /**
@@ -256,7 +269,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
 
   private void scanLogFiles() {
     String path = readerContext.getTablePath();
-    HoodieMergedLogRecordReader logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
+    try (HoodieMergedLogRecordReader logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
         .withHoodieReaderContext(readerContext)
         .withStorage(storage)
         .withLogFiles(logFiles)
@@ -265,8 +278,15 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         .withPartition(getRelativePartitionPath(
             new StoragePath(path), logFiles.get(0).getPath().getParent()))
         .withRecordBuffer(recordBuffer)
-        .build();
-    logRecordReader.close();
+        .build()) {
+      
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
+      
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
+      readStats.setTotalLogFilesCompacted(logRecordReader.getTotalLogFiles());
+      readStats.setTotalLogRecords(logRecordReader.getTotalLogRecords());
+      readStats.setTotalLogBlocks(logRecordReader.getTotalLogBlocks());
+      
readStats.setTotalCorruptLogBlock(logRecordReader.getTotalCorruptBlocks());
+      readStats.setTotalRollbackBlocks(logRecordReader.getTotalRollbacks());
+    }
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index 139c3529475..d9ba8bcd90e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -107,11 +107,6 @@ public interface HoodieFileGroupRecordBuffer<T> {
    */
   void setBaseFileIterator(ClosableIterator<T> baseFileIterator);
 
-  /**
-   * @return statistics of log merging.
-   */
-  HoodieReadStats getStats();
-
   /**
    * Check if next merged record exists.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 97712fe6be7..f501a3e78f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -53,8 +53,9 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
                                              RecordMergeMode recordMergeMode,
                                              Option<String> 
partitionNameOverrideOpt,
                                              Option<String[]> 
partitionPathFieldOpt,
-                                             TypedProperties props) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props);
+                                             TypedProperties props,
+                                             HoodieReadStats readStats) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index be842cad4c4..df6097eb7f7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -71,8 +71,9 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
                                                   RecordMergeMode 
recordMergeMode,
                                                   Option<String> 
partitionNameOverrideOpt,
                                                   Option<String[]> 
partitionPathFieldOpt,
-                                                  TypedProperties props) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props);
+                                                  TypedProperties props,
+                                                  HoodieReadStats readStats) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
index d3617dba499..6e69a5e9d09 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
@@ -36,6 +36,22 @@ public class HoodieReadStats implements Serializable {
   private long numUpdates = 0L;
   // Total number of records deleted
   protected long numDeletes;
+  // Time reading and merging all records from the log files
+  protected long totalLogReadTimeMs;
+  // Total number of log records that were compacted by a compaction operation
+  protected long totalLogRecords;
+  // Total number of log files compacted for a file slice with this base fileid
+  protected long totalLogFilesCompacted;
+  // Total size of all log files for a file slice with this base fileid
+  protected long totalLogSizeCompacted;
+  // Total number of records updated by a compaction operation
+  protected long totalUpdatedRecordsCompacted;
+  // Total number of log blocks seen in a compaction operation
+  protected long totalLogBlocks;
+  // Total number of corrupt blocks seen in a compaction operation
+  protected long totalCorruptLogBlock;
+  // Total number of rollback blocks seen in a compaction operation
+  protected long totalRollbackBlocks;
 
   public HoodieReadStats() {
   }
@@ -58,6 +74,38 @@ public class HoodieReadStats implements Serializable {
     return numDeletes;
   }
 
+  public long getTotalLogReadTimeMs() {
+    return totalLogReadTimeMs;
+  }
+
+  public long getTotalLogRecords() {
+    return totalLogRecords;
+  }
+
+  public long getTotalLogFilesCompacted() {
+    return totalLogFilesCompacted;
+  }
+
+  public long getTotalUpdatedRecordsCompacted() {
+    return totalUpdatedRecordsCompacted;
+  }
+
+  public long getTotalLogSizeCompacted() {
+    return totalLogSizeCompacted;
+  }
+
+  public long getTotalLogBlocks() {
+    return totalLogBlocks;
+  }
+
+  public long getTotalCorruptLogBlock() {
+    return totalCorruptLogBlock;
+  }
+
+  public long getTotalRollbackBlocks() {
+    return totalRollbackBlocks;
+  }
+
   public void incrementNumInserts() {
     numInserts++;
   }
@@ -69,4 +117,36 @@ public class HoodieReadStats implements Serializable {
   public void incrementNumDeletes() {
     numDeletes++;
   }
+
+  public void setTotalLogReadTimeMs(long totalLogReadTimeMs) {
+    this.totalLogReadTimeMs = totalLogReadTimeMs;
+  }
+
+  public void setTotalLogRecords(long totalLogRecords) {
+    this.totalLogRecords = totalLogRecords;
+  }
+
+  public void setTotalLogFilesCompacted(long totalLogFilesCompacted) {
+    this.totalLogFilesCompacted = totalLogFilesCompacted;
+  }
+
+  public void setTotalUpdatedRecordsCompacted(long 
totalUpdatedRecordsCompacted) {
+    this.totalUpdatedRecordsCompacted = totalUpdatedRecordsCompacted;
+  }
+
+  public void setTotalLogSizeCompacted(long totalLogSizeCompacted) {
+    this.totalLogSizeCompacted = totalLogSizeCompacted;
+  }
+
+  public void setTotalLogBlocks(long totalLogBlocks) {
+    this.totalLogBlocks = totalLogBlocks;
+  }
+
+  public void setTotalCorruptLogBlock(long totalCorruptLogBlock) {
+    this.totalCorruptLogBlock = totalCorruptLogBlock;
+  }
+
+  public void setTotalRollbackBlocks(long totalRollbackBlocks) {
+    this.totalRollbackBlocks = totalRollbackBlocks;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
index 6384b71afb2..0838174a2c0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
@@ -51,8 +51,9 @@ public class HoodieUnmergedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
       RecordMergeMode recordMergeMode,
       Option<String> partitionNameOverrideOpt,
       Option<String[]> partitionPathFieldOpt,
-      TypedProperties props) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props);
+      TypedProperties props,
+      HoodieReadStats readStats) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index bed0d12d0e8..e27c96d08b3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -35,6 +35,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer;
 import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
 import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -72,6 +73,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
   private Schema avroSchema;
   private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
   private String partitionPath;
+  private HoodieReadStats readStats;
 
   public void prepareBuffer(RecordMergeMode mergeMode) throws Exception {
     Map<String, String> writeConfigs = new HashMap<>();
@@ -130,13 +132,15 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
       writeConfigs.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
getCustomPayload());
       writeConfigs.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
     }
+    readStats = new HoodieReadStats();
     buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
         ctx,
         metaClient,
         mergeMode,
         partitionNameOpt,
         partitionFields,
-        props);
+        props,
+        readStats);
   }
 
   public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index 1a3c4bfa0e7..03560c8597d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CommitUtils;
@@ -55,6 +56,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -181,7 +183,9 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
   public void testRecordGenerationAPIsForMOR() throws IOException {
     HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;
     cleanupClients();
-    initMetaClient(tableType);
+    Properties props = new Properties();
+    props.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "timestamp");
+    initMetaClient(tableType, props);
     cleanupTimelineService();
     initTimelineService();
 


Reply via email to