This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 21275279f58 [HUDI-8659] Remove ENUM type check and add other fixes for
file group reader-based compaction (#12427)
21275279f58 is described below
commit 21275279f58d75b951fd7c1321aed4b71f287c3e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Dec 5 04:35:29 2024 -0800
[HUDI-8659] Remove ENUM type check and add other fixes for file group
reader-based compaction (#12427)
* [HUDI-8659] Remove ENUM type check for file group reader
* Fix validation in GenericRecordValidationTestUtils
* Disable file group reader for log compaction
* Add log read time to the runtime stat in the write stat for the new merge
handle
* Add more stats
* Fix testRecordGenerationAPIsForMOR
---
.../hudi/table/action/compact/HoodieCompactor.java | 9 +-
.../compact/RunCompactionActionExecutor.java | 2 +-
.../GenericRecordValidationTestUtils.java | 4 +
...HoodieSparkFileGroupReaderBasedMergeHandle.java | 33 +++++--
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 28 ------
.../apache/hudi/common/model/HoodieWriteStat.java | 100 +--------------------
.../read/HoodieBaseFileGroupRecordBuffer.java | 18 ++--
.../common/table/read/HoodieFileGroupReader.java | 46 +++++++---
.../table/read/HoodieFileGroupRecordBuffer.java | 5 --
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 5 +-
.../HoodiePositionBasedFileGroupRecordBuffer.java | 5 +-
.../hudi/common/table/read/HoodieReadStats.java | 80 +++++++++++++++++
.../read/HoodieUnmergedFileGroupRecordBuffer.java | 5 +-
...stHoodiePositionBasedFileGroupRecordBuffer.java | 6 +-
.../TestMetadataUtilRLIandSIRecordGeneration.java | 6 +-
15 files changed, 176 insertions(+), 176 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 96f9316902d..88d90a850cf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -102,7 +102,8 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
* Execute compaction operations and report back status.
*/
public HoodieData<WriteStatus> compact(
- HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+ HoodieEngineContext context, WriteOperationType operationType,
+ HoodieCompactionPlan compactionPlan,
HoodieTable table, HoodieWriteConfig config, String
compactionInstantTime,
HoodieCompactionHandler compactionHandler) {
if (compactionPlan == null || (compactionPlan.getOperations() == null)
@@ -145,9 +146,9 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
boolean useFileGroupReaderBasedCompaction =
context.supportsFileGroupReader() // the engine needs to support fg reader
first
&& !metaClient.isMetadataTable()
&&
config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+ && operationType == WriteOperationType.COMPACT
&& !hasBootstrapFile(operations)
// bootstrap file read for fg reader is not ready
&& StringUtils.isNullOrEmpty(config.getInternalSchema())
// schema evolution support for fg reader is not ready
- && !containsUnsupportedTypesForFileGroupReader(config.getSchema())
// Enum type support by fg reader is not ready
&& config.populateMetaFields();
// Virtual key support by fg reader is not ready
if (useFileGroupReaderBasedCompaction) {
@@ -325,8 +326,4 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
private boolean hasBootstrapFile(List<CompactionOperation> operationList) {
return operationList.stream().anyMatch(operation ->
operation.getBootstrapFilePath().isPresent());
}
-
- private boolean containsUnsupportedTypesForFileGroupReader(String schemaStr)
{
- return HoodieAvroUtils.containsUnsupportedTypesForFileGroupReader(new
Schema.Parser().parse(schemaStr));
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index 066776e1743..276bec1b9bb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -106,7 +106,7 @@ public class RunCompactionActionExecutor<T> extends
}
HoodieData<WriteStatus> statuses = compactor.compact(
- context, compactionPlan, table, configCopy, instantTime,
compactionHandler);
+ context, operationType, compactionPlan, table, configCopy,
instantTime, compactionHandler);
compactor.maybePersist(statuses, context, config, instantTime);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing
compaction metadata: " + config.getTableName());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 1b0ab98ad03..67d189ed1b1 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -81,6 +81,10 @@ public class GenericRecordValidationTestUtils {
HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) value2));
} else if (value1 instanceof Text && value2 instanceof BytesWritable) {
assertArrayEquals(((Text) value1).getBytes(), ((BytesWritable)
value2).getBytes());
+ } else if (f.schema().getType() == Schema.Type.ENUM
+ && value1 instanceof BytesWritable && value2 instanceof Text) {
+ // TODO(HUDI-8660): Revisit ENUM handling in Spark parquet reader
and writer
+ assertArrayEquals(((BytesWritable) value1).getBytes(), ((Text)
value2).getBytes());
} else {
assertEquals(value1, value2, "Field name " + fieldName + " is not
same."
+ " Val1: " + value1 + ", Val2:" + value2);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index b7d8d2f80f2..947ce08efee 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -20,6 +20,7 @@
package org.apache.hudi.io;
import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -82,6 +83,7 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I,
K, O> extends Hood
protected HoodieReaderContext readerContext;
protected FileSlice fileSlice;
protected Configuration conf;
+ protected HoodieReadStats readStats;
public HoodieSparkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config,
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
CompactionOperation
operation, TaskContextSupplier taskContextSupplier,
@@ -225,11 +227,11 @@ public class
HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends Hood
// The stats of inserts, updates, and deletes are updated once at the
end
// These will be set in the write stat when closing the merge handle
- HoodieReadStats stats = fileGroupReader.getStats();
- this.insertRecordsWritten = stats.getNumInserts();
- this.updatedRecordsWritten = stats.getNumUpdates();
- this.recordsDeleted = stats.getNumDeletes();
- this.recordsWritten = stats.getNumInserts() + stats.getNumUpdates();
+ this.readStats = fileGroupReader.getStats();
+ this.insertRecordsWritten = readStats.getNumInserts();
+ this.updatedRecordsWritten = readStats.getNumUpdates();
+ this.recordsDeleted = readStats.getNumDeletes();
+ this.recordsWritten = readStats.getNumInserts() +
readStats.getNumUpdates();
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to compact file slice: " +
fileSlice, e);
@@ -265,4 +267,25 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T,
I, K, O> extends Hood
protected void writeIncomingRecords() {
// no operation.
}
+
+ @Override
+ public List<WriteStatus> close() {
+ try {
+ super.close();
+
writeStatus.getStat().setTotalLogReadTimeMs(readStats.getTotalLogReadTimeMs());
+
writeStatus.getStat().setTotalUpdatedRecordsCompacted(readStats.getTotalUpdatedRecordsCompacted());
+
writeStatus.getStat().setTotalLogFilesCompacted(readStats.getTotalLogFilesCompacted());
+ writeStatus.getStat().setTotalLogRecords(readStats.getTotalLogRecords());
+ writeStatus.getStat().setTotalLogBlocks(readStats.getTotalLogBlocks());
+
writeStatus.getStat().setTotalCorruptLogBlock(readStats.getTotalCorruptLogBlock());
+
writeStatus.getStat().setTotalRollbackBlocks(readStats.getTotalRollbackBlocks());
+
+ if (writeStatus.getStat().getRuntimeStats() != null) {
+
writeStatus.getStat().getRuntimeStats().setTotalScanTime(readStats.getTotalLogReadTimeMs());
+ }
+ return Collections.singletonList(writeStatus);
+ } catch (Exception e) {
+ throw new HoodieUpsertException("Failed to close
HoodieSparkFileGroupReaderBasedMergeHandle", e);
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 06380d36419..7e67e41581e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1487,32 +1487,4 @@ public class HoodieAvroUtils {
throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
}
}
-
- /**
- * Returns whether the schema contains types not supported by the file group
reader.
- * Right now only ENUM type in Avro has known issues.
- *
- * @param schema Avro schema
- *
- * @return whether the schema contains types not supported by the file group
reader.
- */
- public static boolean containsUnsupportedTypesForFileGroupReader(Schema
schema) {
- switch (schema.getType()) {
- case RECORD:
- for (Field field : schema.getFields()) {
- if (containsUnsupportedTypesForFileGroupReader(field.schema())) {
- return true;
- }
- }
- return false;
- case ARRAY:
- return
containsUnsupportedTypesForFileGroupReader(schema.getElementType());
- case MAP:
- return
containsUnsupportedTypesForFileGroupReader(schema.getValueType());
- case UNION:
- return
containsUnsupportedTypesForFileGroupReader(getActualSchemaFromUnion(schema,
null));
- default:
- return schema.getType() == Schema.Type.ENUM;
- }
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 5a415847277..395ba9fdfc2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -91,48 +91,6 @@ public class HoodieWriteStat extends HoodieReadStats {
@Nullable
private String partitionPath;
- /**
- * Total number of log records that were compacted by a compaction operation.
- */
- @Nullable
- private long totalLogRecords;
-
- /**
- * Total number of log files compacted for a file slice with this base
fileid.
- */
- @Nullable
- private long totalLogFilesCompacted;
-
- /**
- * Total size of all log files for a file slice with this base fileid.
- */
- @Nullable
- private long totalLogSizeCompacted;
-
- /**
- * Total number of records updated by a compaction operation.
- */
- @Nullable
- private long totalUpdatedRecordsCompacted;
-
- /**
- * Total number of log blocks seen in a compaction operation.
- */
- @Nullable
- private long totalLogBlocks;
-
- /**
- * Total number of corrupt blocks seen in a compaction operation.
- */
- @Nullable
- private long totalCorruptLogBlock;
-
- /**
- * Total number of rollback blocks seen in a compaction operation.
- */
- @Nullable
- private long totalRollbackBlocks;
-
/**
* File Size as of close.
*/
@@ -241,30 +199,6 @@ public class HoodieWriteStat extends HoodieReadStats {
this.partitionPath = partitionPath;
}
- public long getTotalLogRecords() {
- return totalLogRecords;
- }
-
- public void setTotalLogRecords(long totalLogRecords) {
- this.totalLogRecords = totalLogRecords;
- }
-
- public long getTotalLogFilesCompacted() {
- return totalLogFilesCompacted;
- }
-
- public void setTotalLogFilesCompacted(long totalLogFilesCompacted) {
- this.totalLogFilesCompacted = totalLogFilesCompacted;
- }
-
- public long getTotalUpdatedRecordsCompacted() {
- return totalUpdatedRecordsCompacted;
- }
-
- public void setTotalUpdatedRecordsCompacted(long
totalUpdatedRecordsCompacted) {
- this.totalUpdatedRecordsCompacted = totalUpdatedRecordsCompacted;
- }
-
public void setTempPath(String tempPath) {
this.tempPath = tempPath;
}
@@ -272,39 +206,7 @@ public class HoodieWriteStat extends HoodieReadStats {
public String getTempPath() {
return this.tempPath;
}
-
- public long getTotalLogSizeCompacted() {
- return totalLogSizeCompacted;
- }
-
- public void setTotalLogSizeCompacted(long totalLogSizeCompacted) {
- this.totalLogSizeCompacted = totalLogSizeCompacted;
- }
-
- public long getTotalLogBlocks() {
- return totalLogBlocks;
- }
-
- public void setTotalLogBlocks(long totalLogBlocks) {
- this.totalLogBlocks = totalLogBlocks;
- }
-
- public long getTotalCorruptLogBlock() {
- return totalCorruptLogBlock;
- }
-
- public void setTotalCorruptLogBlock(long totalCorruptLogBlock) {
- this.totalCorruptLogBlock = totalCorruptLogBlock;
- }
-
- public long getTotalRollbackBlocks() {
- return totalRollbackBlocks;
- }
-
- public void setTotalRollbackBlocks(long totalRollbackBlocks) {
- this.totalRollbackBlocks = totalRollbackBlocks;
- }
-
+
public long getFileSizeInBytes() {
return fileSizeInBytes;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 96a74860e8a..1f2879a70f9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -90,7 +90,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected final Option<String> payloadClass;
protected final TypedProperties props;
protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
- protected final HoodieReadStats readerStats;
+ protected final HoodieReadStats readStats;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
protected T nextRecord;
@@ -103,7 +103,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
RecordMergeMode recordMergeMode,
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
- TypedProperties props) {
+ TypedProperties props,
+ HoodieReadStats readStats) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
this.partitionNameOverrideOpt = partitionNameOverrideOpt;
@@ -127,7 +128,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
boolean isBitCaskDiskMapCompressionEnabled =
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
- this.readerStats = new HoodieReadStats();
+ this.readStats = readStats;
try {
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -142,11 +143,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.baseFileIterator = baseFileIterator;
}
- @Override
- public HoodieReadStats getStats() {
- return readerStats;
- }
-
/**
* This allows hasNext() to be called multiple times without incrementing
the iterator by more than 1
* record. It does come with the caveat that hasNext() must be called every
time before next(). But
@@ -511,18 +507,18 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
if (resultRecord.isPresent()) {
// Updates
nextRecord = readerContext.seal(resultRecord.get());
- readerStats.incrementNumUpdates();
+ readStats.incrementNumUpdates();
return true;
} else {
// Deletes
- readerStats.incrementNumDeletes();
+ readStats.incrementNumDeletes();
return false;
}
}
// Inserts
nextRecord = readerContext.seal(baseRecord);
- readerStats.incrementNumInserts();
+ readStats.incrementNumInserts();
return true;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 5bac955e22d..003a5ffa3f7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
@@ -80,6 +79,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private final HoodieFileGroupRecordBuffer<T> recordBuffer;
private ClosableIterator<T> baseFileIterator;
private final Option<UnaryOperator<T>> outputConverter;
+ private final HoodieReadStats readStats;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
HoodieStorage storage,
@@ -120,22 +120,34 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props)
: new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
- this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), isSkipMerge,
shouldUseRecordPosition);
+ this.readStats = new HoodieReadStats();
+ this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
+ tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(),
isSkipMerge,
+ shouldUseRecordPosition, readStats);
}
/**
* Initialize correct record buffer
*/
- private static HoodieFileGroupRecordBuffer
getRecordBuffer(HoodieReaderContext readerContext, HoodieTableMetaClient
hoodieTableMetaClient, RecordMergeMode recordMergeMode,
- TypedProperties
props, boolean hasNoLogFiles, boolean isSkipMerge, boolean
shouldUseRecordPosition) {
+ private static HoodieFileGroupRecordBuffer
getRecordBuffer(HoodieReaderContext readerContext,
+
HoodieTableMetaClient hoodieTableMetaClient,
+ RecordMergeMode
recordMergeMode,
+ TypedProperties
props,
+ boolean
hasNoLogFiles,
+ boolean
isSkipMerge,
+ boolean
shouldUseRecordPosition,
+ HoodieReadStats
readStats) {
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
- return new HoodieUnmergedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props);
+ return new HoodieUnmergedFileGroupRecordBuffer<>(
+ readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
} else if (shouldUseRecordPosition) {
- return new HoodiePositionBasedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props);
+ return new HoodiePositionBasedFileGroupRecordBuffer<>(
+ readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
} else {
- return new HoodieKeyBasedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props);
+ return new HoodieKeyBasedFileGroupRecordBuffer<>(
+ readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
}
}
@@ -237,10 +249,11 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
+ /**
+ * @return statistics of reading a file group.
+ */
public HoodieReadStats getStats() {
- ValidationUtils.checkArgument(recordBuffer != null,
- "Only support getting reader stats from log merging now");
- return recordBuffer.getStats();
+ return readStats;
}
/**
@@ -256,7 +269,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private void scanLogFiles() {
String path = readerContext.getTablePath();
- HoodieMergedLogRecordReader logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
+ try (HoodieMergedLogRecordReader logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
.withHoodieReaderContext(readerContext)
.withStorage(storage)
.withLogFiles(logFiles)
@@ -265,8 +278,15 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
.withPartition(getRelativePartitionPath(
new StoragePath(path), logFiles.get(0).getPath().getParent()))
.withRecordBuffer(recordBuffer)
- .build();
- logRecordReader.close();
+ .build()) {
+
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
+
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
+ readStats.setTotalLogFilesCompacted(logRecordReader.getTotalLogFiles());
+ readStats.setTotalLogRecords(logRecordReader.getTotalLogRecords());
+ readStats.setTotalLogBlocks(logRecordReader.getTotalLogBlocks());
+
readStats.setTotalCorruptLogBlock(logRecordReader.getTotalCorruptBlocks());
+ readStats.setTotalRollbackBlocks(logRecordReader.getTotalRollbacks());
+ }
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index 139c3529475..d9ba8bcd90e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -107,11 +107,6 @@ public interface HoodieFileGroupRecordBuffer<T> {
*/
void setBaseFileIterator(ClosableIterator<T> baseFileIterator);
- /**
- * @return statistics of log merging.
- */
- HoodieReadStats getStats();
-
/**
* Check if next merged record exists.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 97712fe6be7..f501a3e78f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -53,8 +53,9 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
RecordMergeMode recordMergeMode,
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
- TypedProperties props) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props);
+ TypedProperties props,
+ HoodieReadStats readStats) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index be842cad4c4..df6097eb7f7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -71,8 +71,9 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
RecordMergeMode
recordMergeMode,
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
- TypedProperties props) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props);
+ TypedProperties props,
+ HoodieReadStats readStats) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
index d3617dba499..6e69a5e9d09 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
@@ -36,6 +36,22 @@ public class HoodieReadStats implements Serializable {
private long numUpdates = 0L;
// Total number of records deleted
protected long numDeletes;
+ // Time reading and merging all records from the log files
+ protected long totalLogReadTimeMs;
+ // Total number of log records that were compacted by a compaction operation
+ protected long totalLogRecords;
+ // Total number of log files compacted for a file slice with this base fileid
+ protected long totalLogFilesCompacted;
+ // Total size of all log files for a file slice with this base fileid
+ protected long totalLogSizeCompacted;
+ // Total number of records updated by a compaction operation
+ protected long totalUpdatedRecordsCompacted;
+ // Total number of log blocks seen in a compaction operation
+ protected long totalLogBlocks;
+ // Total number of corrupt blocks seen in a compaction operation
+ protected long totalCorruptLogBlock;
+ // Total number of rollback blocks seen in a compaction operation
+ protected long totalRollbackBlocks;
public HoodieReadStats() {
}
@@ -58,6 +74,38 @@ public class HoodieReadStats implements Serializable {
return numDeletes;
}
+ public long getTotalLogReadTimeMs() {
+ return totalLogReadTimeMs;
+ }
+
+ public long getTotalLogRecords() {
+ return totalLogRecords;
+ }
+
+ public long getTotalLogFilesCompacted() {
+ return totalLogFilesCompacted;
+ }
+
+ public long getTotalUpdatedRecordsCompacted() {
+ return totalUpdatedRecordsCompacted;
+ }
+
+ public long getTotalLogSizeCompacted() {
+ return totalLogSizeCompacted;
+ }
+
+ public long getTotalLogBlocks() {
+ return totalLogBlocks;
+ }
+
+ public long getTotalCorruptLogBlock() {
+ return totalCorruptLogBlock;
+ }
+
+ public long getTotalRollbackBlocks() {
+ return totalRollbackBlocks;
+ }
+
public void incrementNumInserts() {
numInserts++;
}
@@ -69,4 +117,36 @@ public class HoodieReadStats implements Serializable {
public void incrementNumDeletes() {
numDeletes++;
}
+
+ public void setTotalLogReadTimeMs(long totalLogReadTimeMs) {
+ this.totalLogReadTimeMs = totalLogReadTimeMs;
+ }
+
+ public void setTotalLogRecords(long totalLogRecords) {
+ this.totalLogRecords = totalLogRecords;
+ }
+
+ public void setTotalLogFilesCompacted(long totalLogFilesCompacted) {
+ this.totalLogFilesCompacted = totalLogFilesCompacted;
+ }
+
+ public void setTotalUpdatedRecordsCompacted(long
totalUpdatedRecordsCompacted) {
+ this.totalUpdatedRecordsCompacted = totalUpdatedRecordsCompacted;
+ }
+
+ public void setTotalLogSizeCompacted(long totalLogSizeCompacted) {
+ this.totalLogSizeCompacted = totalLogSizeCompacted;
+ }
+
+ public void setTotalLogBlocks(long totalLogBlocks) {
+ this.totalLogBlocks = totalLogBlocks;
+ }
+
+ public void setTotalCorruptLogBlock(long totalCorruptLogBlock) {
+ this.totalCorruptLogBlock = totalCorruptLogBlock;
+ }
+
+ public void setTotalRollbackBlocks(long totalRollbackBlocks) {
+ this.totalRollbackBlocks = totalRollbackBlocks;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
index 6384b71afb2..0838174a2c0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
@@ -51,8 +51,9 @@ public class HoodieUnmergedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
- TypedProperties props) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props);
+ TypedProperties props,
+ HoodieReadStats readStats) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index bed0d12d0e8..e27c96d08b3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -35,6 +35,7 @@ import
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer;
import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -72,6 +73,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
private Schema avroSchema;
private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
private String partitionPath;
+ private HoodieReadStats readStats;
public void prepareBuffer(RecordMergeMode mergeMode) throws Exception {
Map<String, String> writeConfigs = new HashMap<>();
@@ -130,13 +132,15 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
writeConfigs.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(),
getCustomPayload());
writeConfigs.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
}
+ readStats = new HoodieReadStats();
buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
ctx,
metaClient,
mergeMode,
partitionNameOpt,
partitionFields,
- props);
+ props,
+ readStats);
}
public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index 1a3c4bfa0e7..03560c8597d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CommitUtils;
@@ -55,6 +56,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -181,7 +183,9 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
public void testRecordGenerationAPIsForMOR() throws IOException {
HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;
cleanupClients();
- initMetaClient(tableType);
+ Properties props = new Properties();
+ props.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "timestamp");
+ initMetaClient(tableType, props);
cleanupTimelineService();
initTimelineService();