This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new bec23bd [HUDI-2269] Release the disk map resource for flink streaming reader (#3384) bec23bd is described below commit bec23bda50b5252013af54dcf39c3a645eea6c7e Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Tue Aug 3 13:55:35 2021 +0800 [HUDI-2269] Release the disk map resource for flink streaming reader (#3384) --- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 1 - .../util/collection/ExternalSpillableMap.java | 2 + .../hudi/sink/bootstrap/BootstrapFunction.java | 27 ++--------- .../org/apache/hudi/table/format/FormatUtils.java | 24 ++++++++++ .../table/format/mor/MergeOnReadInputFormat.java | 56 ++++++++++++++-------- 5 files changed, 67 insertions(+), 43 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 778e825..347f8cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -357,7 +357,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H } } - keyToNewRecords.clear(); ((ExternalSpillableMap) keyToNewRecords).close(); writtenRecordKeys.clear(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 8044f84..d31b0aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -256,7 +256,9 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable } public void close() { + inMemoryMap.clear(); getDiskBasedMap().close(); + currentInMemoryMapSize = 0L; } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index d24452b..0a4ef30 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; @@ -201,7 +202,8 @@ public class BootstrapFunction<I, O extends HoodieRecord> .filter(logFile -> logFile.getFileSize() > 0) .map(logFile -> logFile.getPath().toString()) .collect(toList()); - HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp()); + HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(), + writeConfig, hadoopConf); try { for (String recordKey : scanner.getRecords().keySet()) { @@ -209,6 +211,8 @@ public class BootstrapFunction<I, O extends HoodieRecord> } } catch (Exception e) { throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e); + } finally { + scanner.close(); } } } @@ -218,27 +222,6 @@ public class BootstrapFunction<I, O extends HoodieRecord> this.getClass().getSimpleName(), taskID, partitionPath, cost); } - private HoodieMergedLogRecordScanner scanLog( - List<String> logPaths, - Schema logSchema, - String latestInstantTime) { - String basePath = this.hoodieTable.getMetaClient().getBasePath(); - return HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(FSUtils.getFs(basePath, this.hadoopConf)) - .withBasePath(basePath) - .withLogFilePaths(logPaths) - .withReaderSchema(logSchema) - .withLatestInstantTime(latestInstantTime) - .withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled()) - .withReverseReader(false) - .withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()) - .withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge()) - .withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()) - .withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .build(); - } - @SuppressWarnings("unchecked") public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) { HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index c075b09..cbf1ea7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; @@ -83,6 +84,29 @@ public class FormatUtils { .build(); } + public static HoodieMergedLogRecordScanner scanLog( + List<String> logPaths, + Schema logSchema, + String latestInstantTime, + HoodieWriteConfig writeConfig, + Configuration hadoopConf) { + String basePath = writeConfig.getBasePath(); + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(basePath, hadoopConf)) + .withBasePath(basePath) + .withLogFilePaths(logPaths) + .withReaderSchema(logSchema) + .withLatestInstantTime(latestInstantTime) + .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) + .withReverseReader(false) + .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) + .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()) + .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) + .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) + .build(); + } + private static Boolean string2Boolean(String s) { return "true".equals(s.toLowerCase(Locale.ROOT)); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index aa7453c..4d68242 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -19,7 +19,7 @@ package org.apache.hudi.table.format.mor; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; @@ -59,7 +59,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.IntStream; @@ -293,15 +292,14 @@ public class MergeOnReadInputFormat Long.MAX_VALUE); // read the whole file } - private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) { + private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) { final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema()); final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema()); final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords = - FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords(); - final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator(); + final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf); + final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified // projections. For e.g, if the pk fields are [a, b] but user only select a, @@ -310,7 +308,7 @@ public class MergeOnReadInputFormat final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset); final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes); - return new Iterator<RowData>() { + return new ClosableIterator<RowData>() { private RowData currentRecord; @Override @@ -318,7 +316,7 @@ public class MergeOnReadInputFormat while (logRecordsKeyIterator.hasNext()) { String curAvroKey = logRecordsKeyIterator.next(); Option<IndexedRecord> curAvroRecord = null; - final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey); + final HoodieRecord<?> hoodieRecord = scanner.getRecords().get(curAvroKey); try { curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); } catch (IOException e) { @@ -359,6 +357,11 @@ public class MergeOnReadInputFormat public RowData next() { return currentRecord; } + + @Override + public void close() { + scanner.close(); + } }; } @@ -366,6 +369,11 @@ public class MergeOnReadInputFormat // Inner Class // ------------------------------------------------------------------------- + private interface ClosableIterator<E> extends Iterator<E>, AutoCloseable { + @Override + void close(); // override to not throw exception + } + private interface RecordIterator { boolean reachedEnd() throws IOException; @@ -453,9 +461,9 @@ public class MergeOnReadInputFormat static class LogFileOnlyIterator implements RecordIterator { // iterator for log files - private final Iterator<RowData> iterator; + private final ClosableIterator<RowData> iterator; - LogFileOnlyIterator(Iterator<RowData> iterator) { + LogFileOnlyIterator(ClosableIterator<RowData> iterator) { this.iterator = iterator; } @@ -471,7 +479,9 @@ public class MergeOnReadInputFormat @Override public void close() { - // no operation + if (this.iterator != null) { + this.iterator.close(); + } } } @@ -479,7 +489,7 @@ public class MergeOnReadInputFormat // base file reader private final ParquetColumnarRowSplitReader reader; // iterator for log files - private final Iterator<RowData> iterator; + private final ClosableIterator<RowData> iterator; // add the flag because the flink ParquetColumnarRowSplitReader is buggy: // method #reachedEnd() returns false after it returns true. @@ -488,7 +498,7 @@ public class MergeOnReadInputFormat private RowData currentRecord; - SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) { + SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) { this.reader = reader; this.iterator = iterator; } @@ -517,6 +527,9 @@ public class MergeOnReadInputFormat if (this.reader != null) { this.reader.close(); } + if (this.iterator != null) { + this.iterator.close(); + } } } @@ -525,8 +538,8 @@ public class MergeOnReadInputFormat private final ParquetColumnarRowSplitReader reader; // log keys used for merging private final Iterator<String> logKeysIterator; - // log records - private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords; + // scanner + private final HoodieMergedLogRecordScanner scanner; private final Schema tableSchema; private final Schema requiredSchema; @@ -559,8 +572,8 @@ public class MergeOnReadInputFormat ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords(); - this.logKeysIterator = this.logRecords.keySet().iterator(); + this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf); + this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos; this.recordBuilder = new GenericRecordBuilder(requiredSchema); @@ -582,7 +595,7 @@ public class MergeOnReadInputFormat } } final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); - if (logRecords.containsKey(curKey)) { + if (scanner.getRecords().containsKey(curKey)) { keyToSkip.add(curKey); Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey); if (!mergedAvroRecord.isPresent()) { @@ -608,7 +621,7 @@ public class MergeOnReadInputFormat final String curKey = logKeysIterator.next(); if (!keyToSkip.contains(curKey)) { Option<IndexedRecord> insertAvroRecord = - logRecords.get(curKey).getData().getInsertValue(tableSchema); + scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema); if (insertAvroRecord.isPresent()) { // the record is a DELETE if insertAvroRecord not present, skipping GenericRecord requiredAvroRecord = buildAvroRecordBySchema( @@ -634,13 +647,16 @@ public class MergeOnReadInputFormat if (this.reader != null) { this.reader.close(); } + if (this.scanner != null) { + this.scanner.close(); + } } private Option<IndexedRecord> mergeRowWithLog( RowData curRow, String curKey) throws IOException { GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); + return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); } }