This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0afeb0ecc2512b2e529482592ec9bee5af229d1d Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Mon Dec 12 16:20:45 2022 +0530 [HUDI-5353] Close file readers (#7412) --- .../org/apache/hudi/index/HoodieIndexUtils.java | 3 +- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 3 +- .../hudi/table/action/commit/BaseMergeHelper.java | 32 ------------------- .../table/action/commit/HoodieMergeHelper.java | 27 +++++++++++++--- .../run/strategy/JavaExecutionStrategy.java | 16 +++++++--- .../hudi/table/action/commit/JavaMergeHelper.java | 13 +++++++- .../MultipleSparkJobExecutionStrategy.java | 36 +++++++++++----------- .../strategy/SingleSparkJobExecutionStrategy.java | 2 -- .../org/apache/hudi/hadoop/InputSplitUtils.java | 28 ----------------- .../utils/HoodieRealtimeRecordReaderUtils.java | 27 +++------------- .../utilities/HoodieMetadataTableValidator.java | 31 +++++++++---------- 11 files changed, 86 insertions(+), 132 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index d6872276ac3..6bbea356e51 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -150,11 +150,10 @@ public class HoodieIndexUtils { Configuration configuration) throws HoodieIndexException { ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath)); List<String> foundRecordKeys = new ArrayList<>(); - try { + try (HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath)) { // Load all rowKeys from the file, to double-confirm if (!candidateRecordKeys.isEmpty()) { HoodieTimer timer = HoodieTimer.start(); - HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath); Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys)); foundRecordKeys.addAll(fileRowKeys); LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 82c6de57614..6e172d01a65 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -447,8 +447,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H } long oldNumWrites = 0; - try { - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath); + try (HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) { oldNumWrites = reader.getTotalRecords(); } catch (IOException e) { throw new HoodieUpsertException("Failed to check for merge data validation", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 5ead348140a..8c34e3c3a74 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -18,15 +18,10 @@ package org.apache.hudi.table.action.commit; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.client.utils.MergingIterator; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericDatumReader; @@ -35,14 +30,10 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Iterator; /** * Helper to read records from previous version of base file and run Merge. @@ -83,29 +74,6 @@ public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> { } } - /** - * Create Parquet record iterator that provides a stitched view of record read from skeleton and bootstrap file. - * Skeleton file is a representation of the bootstrap file inside the table, with just the bare bone fields needed - * for indexing, writing and other functionality. - * - */ - protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> table, HoodieMergeHandle<T, I, K, O> mergeHandle, - HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader, - Schema readSchema, boolean externalSchemaTransformation) throws IOException { - Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); - Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); - HoodieFileReader<GenericRecord> bootstrapReader = HoodieFileReaderFactory.<GenericRecord>getFileReader(bootstrapFileConfig, externalFilePath); - Schema bootstrapReadSchema; - if (externalSchemaTransformation) { - bootstrapReadSchema = bootstrapReader.getSchema(); - } else { - bootstrapReadSchema = mergeHandle.getWriterSchema(); - } - - return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema), - (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); - } - /** * Consumer that dequeues records from queue and sends to Merge Handle. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 5d1a55453d1..0a7330de454 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,15 +18,16 @@ package org.apache.hudi.table.action.commit; -import org.apache.avro.SchemaCompatibility; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; @@ -50,6 +51,9 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.hadoop.conf.Configuration; +import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.fs.Path; + import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -80,9 +84,12 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends final GenericDatumWriter<GenericRecord> gWriter; final GenericDatumReader<GenericRecord> gReader; + HoodieFileReader<GenericRecord> baseFileReader = null; + HoodieFileReader<GenericRecord> bootstrapFileReader = null; Schema readSchema; if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); + baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()); + readSchema = baseFileReader.getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); } else { @@ -126,7 +133,14 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends try { final Iterator<GenericRecord> readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + ClosableIterator<GenericRecord> baseFileRecordIterator = baseFileReader.getRecordIterator(); + Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); + Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); + bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath); + readerIterator = new MergingIterator<>( + baseFileRecordIterator, + bootstrapFileReader.getRecordIterator(), + (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); } else { if (needToReWriteRecord) { readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols); @@ -150,8 +164,11 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends } finally { // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting // and executor firstly and then close mergeHandle. - if (reader != null) { - reader.close(); + if (baseFileReader != null) { + baseFileReader.close(); + } + if (bootstrapFileReader != null) { + bootstrapFileReader.close(); } if (null != wrapper) { wrapper.shutdownNow(); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 456bb3cb47c..c6f885fa916 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -177,9 +177,11 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>> clusteringOps.forEach(clusteringOp -> { long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config); LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + Option<HoodieFileReader> baseFileReader = Option.empty(); + HoodieMergedLogRecordScanner scanner = null; try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(table.getMetaClient().getFs()) .withBasePath(table.getMetaClient().getBasePath()) .withLogFilePaths(clusteringOp.getDeltaFilePaths()) @@ -195,7 +197,7 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>> .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); - Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); @@ -208,6 +210,13 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>> } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e); + } finally { + if (scanner != null) { + scanner.close(); + } + if (baseFileReader.isPresent()) { + baseFileReader.get().close(); + } } }); return records; @@ -219,9 +228,8 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>> private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) { List<HoodieRecord<T>> records = new ArrayList<>(); clusteringOps.forEach(clusteringOp -> { - try { + try (HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()))) { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); Iterator<IndexedRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema); recordIterator.forEachRemaining(record -> records.add(transform(record))); } catch (IOException e) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java index 46dd30a7cb7..69ef4a5d106 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -39,6 +41,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.Iterator; @@ -81,10 +84,15 @@ public class JavaMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHel BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null; HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieFileReader<GenericRecord> bootstrapFileReader = null; try { final Iterator<GenericRecord> readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); + Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); + bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath); + readerIterator = new MergingIterator<>(reader.getRecordIterator(), bootstrapFileReader.getRecordIterator(), + (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); } else { readerIterator = reader.getRecordIterator(readSchema); } @@ -107,6 +115,9 @@ public class JavaMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHel if (reader != null) { reader.close(); } + if (bootstrapFileReader != null) { + bootstrapFileReader.close(); + } if (null != wrapper) { wrapper.shutdownNow(); wrapper.awaitTermination(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 03a0c6e36b0..e37eb4316d0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -104,20 +104,20 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false); // execute clustering for each group async and collect WriteStatus Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf( - clusteringPlan.getInputGroups().stream() - .map(inputGroup -> { - if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { - return runClusteringForGroupAsyncAsRow(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - shouldPreserveMetadata, - instantTime); - } - return runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - shouldPreserveMetadata, - instantTime); - }) - .collect(Collectors.toList())) + clusteringPlan.getInputGroups().stream() + .map(inputGroup -> { + if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { + return runClusteringForGroupAsyncAsRow(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime); + } + return runClusteringForGroupAsync(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime); + }) + .collect(Collectors.toList())) .join() .stream(); JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD)); @@ -187,7 +187,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa Option<String[]> orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); - + return orderByColumnsOpt.map(orderByColumns -> { HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy(); switch (layoutOptStrategy) { @@ -267,8 +267,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa * Read records from baseFiles, apply updates and convert to RDD. */ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc, - List<ClusteringOperation> clusteringOps, - String instantTime) { + List<ClusteringOperation> clusteringOps, + String instantTime) { HoodieWriteConfig config = getWriteConfig(); HoodieTable table = getHoodieTable(); return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { @@ -317,7 +317,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa * Read records from baseFiles and convert to RDD. */ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, - List<ClusteringOperation> clusteringOps) { + List<ClusteringOperation> clusteringOps) { SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf()); HoodieWriteConfig writeConfig = getWriteConfig(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index 46d2466c5cf..8606c89c49b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -103,7 +103,6 @@ public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayl return writeMetadata; } - /** * Submit job to execute clustering for the group. */ @@ -124,7 +123,6 @@ public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayl .flatMap(Collection::stream); } - /** * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. * The number of new file groups created is bounded by numOutputGroups. diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java index e485e72c257..5dcd66cd826 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java @@ -22,14 +22,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.charset.StandardCharsets; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; public class InputSplitUtils { @@ -52,24 +44,4 @@ public class InputSplitUtils { public static boolean readBoolean(DataInput in) throws IOException { return in.readBoolean(); } - - /** - * Return correct base-file schema based on split. - * - * @param split File Split - * @param conf Configuration - * @return - */ - public static Schema getBaseFileSchema(FileSplit split, Configuration conf) { - try { - if (split instanceof BootstrapBaseFileSplit) { - HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, - ((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath()); - return HoodieAvroUtils.addMetadataFields(storageReader.getSchema()); - } - return HoodieRealtimeRecordReaderUtils.readSchema(conf, split.getPath()); - } catch (IOException e) { - throw new HoodieIOException("Failed to read footer for parquet " + split.getPath(), e); - } - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index e3466a6401a..937a20ea3d4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -18,6 +18,10 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; + import org.apache.avro.AvroRuntimeException; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; @@ -25,8 +29,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -42,20 +44,13 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.nio.ByteBuffer; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; -import java.sql.Timestamp; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -69,18 +64,6 @@ import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; public class HoodieRealtimeRecordReaderUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class); - /** - * Reads the schema from the base file. - */ - public static Schema readSchema(Configuration conf, Path filePath) { - try { - HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath); - return storageReader.getSchema(); - } catch (IOException e) { - throw new HoodieIOException("Failed to read schema from " + filePath, e); - } - } - /** * get the max compaction memory in bytes from JobConf. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 6566f0c029a..347078ec71c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -152,7 +152,7 @@ public class HoodieMetadataTableValidator implements Serializable { // Properties with source, hoodie client, key generator etc. private TypedProperties props; - private HoodieTableMetaClient metaClient; + private final HoodieTableMetaClient metaClient; protected transient Option<AsyncMetadataTableValidateService> asyncMetadataTableValidateService; @@ -940,10 +940,10 @@ public class HoodieMetadataTableValidator implements Serializable { * verified in the {@link HoodieMetadataTableValidator}. */ private static class HoodieMetadataValidationContext implements Serializable { - private HoodieTableMetaClient metaClient; - private HoodieTableFileSystemView fileSystemView; - private HoodieTableMetadata tableMetadata; - private boolean enableMetadataTable; + private final HoodieTableMetaClient metaClient; + private final HoodieTableFileSystemView fileSystemView; + private final HoodieTableMetadata tableMetadata; + private final boolean enableMetadataTable; private List<String> allColumnNameList; public HoodieMetadataValidationContext( @@ -1038,30 +1038,29 @@ public class HoodieMetadataTableValidator implements Serializable { TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); try { return schemaResolver.getTableAvroSchema().getFields().stream() - .map(entry -> entry.name()).collect(Collectors.toList()); + .map(Schema.Field::name).collect(Collectors.toList()); } catch (Exception e) { throw new HoodieException("Failed to get all column names for " + metaClient.getBasePath()); } } private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, String filename) { - Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath), filename); - HoodieFileReader<IndexedRecord> fileReader; - try { - fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path); + Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), filename); + BloomFilter bloomFilter; + try (HoodieFileReader<IndexedRecord> fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path)) { + bloomFilter = fileReader.readBloomFilter(); + if (bloomFilter == null) { + Log.error("Failed to read bloom filter for " + path); + return Option.empty(); + } } catch (IOException e) { Log.error("Failed to get file reader for " + path + " " + e.getMessage()); return Option.empty(); } - final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); - if (fileBloomFilter == null) { - Log.error("Failed to read bloom filter for " + path); - return Option.empty(); - } return Option.of(BloomFilterData.builder() .setPartitionPath(partitionPath) .setFilename(filename) - .setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes())) + .setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes())) .build()); } }