This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0afeb0ecc2512b2e529482592ec9bee5af229d1d
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Mon Dec 12 16:20:45 2022 +0530

    [HUDI-5353] Close file readers (#7412)
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  3 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  3 +-
 .../hudi/table/action/commit/BaseMergeHelper.java  | 32 -------------------
 .../table/action/commit/HoodieMergeHelper.java     | 27 +++++++++++++---
 .../run/strategy/JavaExecutionStrategy.java        | 16 +++++++---
 .../hudi/table/action/commit/JavaMergeHelper.java  | 13 +++++++-
 .../MultipleSparkJobExecutionStrategy.java         | 36 +++++++++++-----------
 .../strategy/SingleSparkJobExecutionStrategy.java  |  2 --
 .../org/apache/hudi/hadoop/InputSplitUtils.java    | 28 -----------------
 .../utils/HoodieRealtimeRecordReaderUtils.java     | 27 +++-------------
 .../utilities/HoodieMetadataTableValidator.java    | 31 +++++++++----------
 11 files changed, 86 insertions(+), 132 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index d6872276ac3..6bbea356e51 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -150,11 +150,10 @@ public class HoodieIndexUtils {
                                                 Configuration configuration) 
throws HoodieIndexException {
     ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
     List<String> foundRecordKeys = new ArrayList<>();
-    try {
+    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getFileReader(configuration, filePath)) {
       // Load all rowKeys from the file, to double-confirm
       if (!candidateRecordKeys.isEmpty()) {
         HoodieTimer timer = HoodieTimer.start();
-        HoodieFileReader fileReader = 
HoodieFileReaderFactory.getFileReader(configuration, filePath);
         Set<String> fileRowKeys = fileReader.filterRowKeys(new 
TreeSet<>(candidateRecordKeys));
         foundRecordKeys.addAll(fileRowKeys);
         LOG.info(String.format("Checked keys against file %s, in %d ms. 
#candidates (%d) #found (%d)", filePath,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 82c6de57614..6e172d01a65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -447,8 +447,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     }
 
     long oldNumWrites = 0;
-    try {
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
+    try (HoodieFileReader reader = 
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), 
oldFilePath)) {
       oldNumWrites = reader.getTotalRecords();
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to check for merge data 
validation", e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 5ead348140a..8c34e3c3a74 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -18,15 +18,10 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericDatumReader;
@@ -35,14 +30,10 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
 
 /**
  * Helper to read records from previous version of base file and run Merge.
@@ -83,29 +74,6 @@ public abstract class BaseMergeHelper<T extends 
HoodieRecordPayload, I, K, O> {
     }
   }
 
-  /**
-   * Create Parquet record iterator that provides a stitched view of record 
read from skeleton and bootstrap file.
-   * Skeleton file is a representation of the bootstrap file inside the table, 
with just the bare bone fields needed
-   * for indexing, writing and other functionality.
-   *
-   */
-  protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> 
table, HoodieMergeHandle<T, I, K, O> mergeHandle,
-                                                                               
                HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
-                                                                               
                Schema readSchema, boolean externalSchemaTransformation) throws 
IOException {
-    Path externalFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
-    Configuration bootstrapFileConfig = new 
Configuration(table.getHadoopConf());
-    HoodieFileReader<GenericRecord> bootstrapReader = 
HoodieFileReaderFactory.<GenericRecord>getFileReader(bootstrapFileConfig, 
externalFilePath);
-    Schema bootstrapReadSchema;
-    if (externalSchemaTransformation) {
-      bootstrapReadSchema = bootstrapReader.getSchema();
-    } else {
-      bootstrapReadSchema = mergeHandle.getWriterSchema();
-    }
-
-    return new MergingIterator<>(reader.getRecordIterator(readSchema), 
bootstrapReader.getRecordIterator(bootstrapReadSchema),
-        (inputRecordPair) -> 
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), 
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
-  }
-
   /**
    * Consumer that dequeues records from queue and sends to Merge Handle.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 5d1a55453d1..0a7330de454 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -18,15 +18,16 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.avro.SchemaCompatibility;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
@@ -50,6 +51,9 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.avro.SchemaCompatibility;
+import org.apache.hadoop.fs.Path;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -80,9 +84,12 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends
 
     final GenericDatumWriter<GenericRecord> gWriter;
     final GenericDatumReader<GenericRecord> gReader;
+    HoodieFileReader<GenericRecord> baseFileReader = null;
+    HoodieFileReader<GenericRecord> bootstrapFileReader = null;
     Schema readSchema;
     if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
+      baseFileReader = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath());
+      readSchema = baseFileReader.getSchema();
       gWriter = new GenericDatumWriter<>(readSchema);
       gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
     } else {
@@ -126,7 +133,14 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends
     try {
       final Iterator<GenericRecord> readerIterator;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        ClosableIterator<GenericRecord> baseFileRecordIterator = 
baseFileReader.getRecordIterator();
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        Configuration bootstrapFileConfig = new 
Configuration(table.getHadoopConf());
+        bootstrapFileReader = 
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
+        readerIterator = new MergingIterator<>(
+            baseFileRecordIterator,
+            bootstrapFileReader.getRecordIterator(),
+            (inputRecordPair) -> 
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), 
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
       } else {
         if (needToReWriteRecord) {
           readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema, renameCols);
@@ -150,8 +164,11 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends
     } finally {
       // HUDI-2875: mergeHandle is not thread safe, we should totally 
terminate record inputting
       // and executor firstly and then close mergeHandle.
-      if (reader != null) {
-        reader.close();
+      if (baseFileReader != null) {
+        baseFileReader.close();
+      }
+      if (bootstrapFileReader != null) {
+        bootstrapFileReader.close();
       }
       if (null != wrapper) {
         wrapper.shutdownNow();
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 456bb3cb47c..c6f885fa916 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -177,9 +177,11 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
     clusteringOps.forEach(clusteringOp -> {
       long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
JavaTaskContextSupplier(), config);
       LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
+      Option<HoodieFileReader> baseFileReader = Option.empty();
+      HoodieMergedLogRecordScanner scanner = null;
       try {
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
-        HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        scanner = HoodieMergedLogRecordScanner.newBuilder()
             .withFileSystem(table.getMetaClient().getFs())
             .withBasePath(table.getMetaClient().getBasePath())
             .withLogFilePaths(clusteringOp.getDeltaFilePaths())
@@ -195,7 +197,7 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
             
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
             .build();
 
-        Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+        baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
             ? Option.empty()
             : 
Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new 
Path(clusteringOp.getDataFilePath())));
         HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
@@ -208,6 +210,13 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
       } catch (IOException e) {
         throw new HoodieClusteringException("Error reading input data for " + 
clusteringOp.getDataFilePath()
             + " and " + clusteringOp.getDeltaFilePaths(), e);
+      } finally {
+        if (scanner != null) {
+          scanner.close();
+        }
+        if (baseFileReader.isPresent()) {
+          baseFileReader.get().close();
+        }
       }
     });
     return records;
@@ -219,9 +228,8 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
   private List<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
     List<HoodieRecord<T>> records = new ArrayList<>();
     clusteringOps.forEach(clusteringOp -> {
-      try {
+      try (HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()))) {
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
-        HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()));
         Iterator<IndexedRecord> recordIterator = 
baseFileReader.getRecordIterator(readerSchema);
         recordIterator.forEachRemaining(record -> 
records.add(transform(record)));
       } catch (IOException e) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 46dd30a7cb7..69ef4a5d106 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -39,6 +41,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -81,10 +84,15 @@ public class JavaMergeHelper<T extends HoodieRecordPayload> 
extends BaseMergeHel
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
     HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    HoodieFileReader<GenericRecord> bootstrapFileReader = null;
     try {
       final Iterator<GenericRecord> readerIterator;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        Configuration bootstrapFileConfig = new 
Configuration(table.getHadoopConf());
+        bootstrapFileReader = 
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
+        readerIterator = new MergingIterator<>(reader.getRecordIterator(), 
bootstrapFileReader.getRecordIterator(),
+            (inputRecordPair) -> 
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), 
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
       } else {
         readerIterator = reader.getRecordIterator(readSchema);
       }
@@ -107,6 +115,9 @@ public class JavaMergeHelper<T extends HoodieRecordPayload> 
extends BaseMergeHel
       if (reader != null) {
         reader.close();
       }
+      if (bootstrapFileReader != null) {
+        bootstrapFileReader.close();
+      }
       if (null != wrapper) {
         wrapper.shutdownNow();
         wrapper.awaitTermination();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 03a0c6e36b0..e37eb4316d0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -104,20 +104,20 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
     boolean shouldPreserveMetadata = 
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
     // execute clustering for each group async and collect WriteStatus
     Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
-        clusteringPlan.getInputGroups().stream()
-            .map(inputGroup -> {
-              if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
-                return runClusteringForGroupAsyncAsRow(inputGroup,
-                    clusteringPlan.getStrategy().getStrategyParams(),
-                    shouldPreserveMetadata,
-                    instantTime);
-              }
-              return runClusteringForGroupAsync(inputGroup,
-                  clusteringPlan.getStrategy().getStrategyParams(),
-                  shouldPreserveMetadata,
-                  instantTime);
-            })
-            .collect(Collectors.toList()))
+            clusteringPlan.getInputGroups().stream()
+                .map(inputGroup -> {
+                  if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
+                    return runClusteringForGroupAsyncAsRow(inputGroup,
+                        clusteringPlan.getStrategy().getStrategyParams(),
+                        shouldPreserveMetadata,
+                        instantTime);
+                  }
+                  return runClusteringForGroupAsync(inputGroup,
+                      clusteringPlan.getStrategy().getStrategyParams(),
+                      shouldPreserveMetadata,
+                      instantTime);
+                })
+                .collect(Collectors.toList()))
         .join()
         .stream();
     JavaRDD<WriteStatus>[] writeStatuses = 
convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
@@ -187,7 +187,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
     Option<String[]> orderByColumnsOpt =
         Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
             .map(listStr -> listStr.split(","));
-    
+
     return orderByColumnsOpt.map(orderByColumns -> {
       HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = 
getWriteConfig().getLayoutOptimizationStrategy();
       switch (layoutOptStrategy) {
@@ -267,8 +267,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    * Read records from baseFiles, apply updates and convert to RDD.
    */
   private HoodieData<HoodieRecord<T>> 
readRecordsForGroupWithLogs(JavaSparkContext jsc,
-                                                               
List<ClusteringOperation> clusteringOps,
-                                                               String 
instantTime) {
+                                                                  
List<ClusteringOperation> clusteringOps,
+                                                                  String 
instantTime) {
     HoodieWriteConfig config = getWriteConfig();
     HoodieTable table = getHoodieTable();
     return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, 
clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
@@ -317,7 +317,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    * Read records from baseFiles and convert to RDD.
    */
   private HoodieData<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContext jsc,
-                                                                
List<ClusteringOperation> clusteringOps) {
+                                                                   
List<ClusteringOperation> clusteringOps) {
     SerializableConfiguration hadoopConf = new 
SerializableConfiguration(getHoodieTable().getHadoopConf());
     HoodieWriteConfig writeConfig = getWriteConfig();
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 46d2466c5cf..8606c89c49b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -103,7 +103,6 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
     return writeMetadata;
   }
 
-
   /**
    * Submit job to execute clustering for the group.
    */
@@ -124,7 +123,6 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
         .flatMap(Collection::stream);
   }
 
-
   /**
    * Execute clustering to write inputRecords into new files as defined by 
rules in strategy parameters.
    * The number of new file groups created is bounded by numOutputGroups.
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
index e485e72c257..5dcd66cd826 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
@@ -22,14 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 
 public class InputSplitUtils {
 
@@ -52,24 +44,4 @@ public class InputSplitUtils {
   public static boolean readBoolean(DataInput in) throws IOException {
     return in.readBoolean();
   }
-
-  /**
-   * Return correct base-file schema based on split.
-   *
-   * @param split File Split
-   * @param conf Configuration
-   * @return
-   */
-  public static Schema getBaseFileSchema(FileSplit split, Configuration conf) {
-    try {
-      if (split instanceof BootstrapBaseFileSplit) {
-        HoodieFileReader storageReader = 
HoodieFileReaderFactory.getFileReader(conf,
-            
((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath());
-        return HoodieAvroUtils.addMetadataFields(storageReader.getSchema());
-      }
-      return HoodieRealtimeRecordReaderUtils.readSchema(conf, split.getPath());
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to read footer for parquet " + 
split.getPath(), e);
-    }
-  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index e3466a6401a..937a20ea3d4 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalTypes;
@@ -25,8 +29,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -42,20 +44,13 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.sql.Timestamp;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -69,18 +64,6 @@ import static 
org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 public class HoodieRealtimeRecordReaderUtils {
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
 
-  /**
-   * Reads the schema from the base file.
-   */
-  public static Schema readSchema(Configuration conf, Path filePath) {
-    try {
-      HoodieFileReader storageReader = 
HoodieFileReaderFactory.getFileReader(conf, filePath);
-      return storageReader.getSchema();
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to read schema from " + filePath, e);
-    }
-  }
-
   /**
    * get the max compaction memory in bytes from JobConf.
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 6566f0c029a..347078ec71c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -152,7 +152,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   // Properties with source, hoodie client, key generator etc.
   private TypedProperties props;
 
-  private HoodieTableMetaClient metaClient;
+  private final HoodieTableMetaClient metaClient;
 
   protected transient Option<AsyncMetadataTableValidateService> 
asyncMetadataTableValidateService;
 
@@ -940,10 +940,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * verified in the {@link HoodieMetadataTableValidator}.
    */
   private static class HoodieMetadataValidationContext implements Serializable 
{
-    private HoodieTableMetaClient metaClient;
-    private HoodieTableFileSystemView fileSystemView;
-    private HoodieTableMetadata tableMetadata;
-    private boolean enableMetadataTable;
+    private final HoodieTableMetaClient metaClient;
+    private final HoodieTableFileSystemView fileSystemView;
+    private final HoodieTableMetadata tableMetadata;
+    private final boolean enableMetadataTable;
     private List<String> allColumnNameList;
 
     public HoodieMetadataValidationContext(
@@ -1038,30 +1038,29 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
       try {
         return schemaResolver.getTableAvroSchema().getFields().stream()
-            .map(entry -> entry.name()).collect(Collectors.toList());
+            .map(Schema.Field::name).collect(Collectors.toList());
       } catch (Exception e) {
         throw new HoodieException("Failed to get all column names for " + 
metaClient.getBasePath());
       }
     }
 
     private Option<BloomFilterData> readBloomFilterFromFile(String 
partitionPath, String filename) {
-      Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath), filename);
-      HoodieFileReader<IndexedRecord> fileReader;
-      try {
-        fileReader = 
HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path);
+      Path path = new 
Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), 
filename);
+      BloomFilter bloomFilter;
+      try (HoodieFileReader<IndexedRecord> fileReader = 
HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path)) {
+        bloomFilter = fileReader.readBloomFilter();
+        if (bloomFilter == null) {
+          Log.error("Failed to read bloom filter for " + path);
+          return Option.empty();
+        }
       } catch (IOException e) {
         Log.error("Failed to get file reader for " + path + " " + 
e.getMessage());
         return Option.empty();
       }
-      final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
-      if (fileBloomFilter == null) {
-        Log.error("Failed to read bloom filter for " + path);
-        return Option.empty();
-      }
       return Option.of(BloomFilterData.builder()
           .setPartitionPath(partitionPath)
           .setFilename(filename)
-          
.setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()))
+          
.setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()))
           .build());
     }
   }

Reply via email to