vinothchandar commented on code in PR #12321:
URL: https://github.com/apache/hudi/pull/12321#discussion_r1855506943
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1044,29 +1042,28 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
/**
* Update from {@code HoodieCommitMetadata}.
- *
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
*/
@Override
- public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus, String instantTime) {
+ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
processAndCommit(instantTime, () -> {
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getWritesFileIdEncoding(),
+ dataWriteConfig.getMetadataConfig());
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
if (dataWriteConfig.isRecordIndexEnabled()) {
- HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpserts(writeStatus);
- HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
- partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
updatesFromWriteStatuses.union(additionalUpdates));
+ HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()),
commitMetadata);
+ partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
}
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
- updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
writeStatus);
+ updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
instantTime);
Review Comment:
nts: we seem to be independently writing these indexes - one for each
partition. As long as there is 1 delta commit, this should be fine.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1141,28 +1140,22 @@ private void
updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
- secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, writeStatus);
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, instantTime);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
}
partitionToRecordMap.put(partition, secondaryIndexRecords);
});
}
- private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs =
getPartitionFilePairs(commitMetadata);
// Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
// the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
// operation is going to be expensive (in CPU, memory and IO)
- List<String> keysToRemove = new ArrayList<>();
- writeStatus.collectAsList().forEach(status -> {
Review Comment:
wow. this would have OOM-ed anyway. since we are collecting per record
delegates on the driver.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -42,16 +43,19 @@
public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner {
private final LogRecordScannerCallback callback;
+ private final DeletionCallback deletionCallback;
private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean
reverseReader, int bufferSize,
- LogRecordScannerCallback callback,
Option<InstantRange> instantRange, InternalSchema internalSchema,
+ LogRecordScannerCallback callback,
DeletionCallback deletionCallback,
+ Option<InstantRange> instantRange,
InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
reverseReader, bufferSize, instantRange,
false, true, Option.empty(), internalSchema, Option.empty(),
enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
this.callback = callback;
+ this.deletionCallback = deletionCallback;
Review Comment:
scanner is read-only. why do we need a deletion callback here
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
Review Comment:
UT
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
Review Comment:
nts: check there is no copying of all records into memory or sth
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+ /**
+ * Generates RLI Metadata records for base files.
+ * If base file is a added to a new file group, all record keys are treated
as inserts.
+ * If a base file is added to an existing file group, we read previous base
file in addition to the latest base file of interest. Find deleted records and
generate RLI Metadata records
+ * for the same in addition to new insert records.
+ * @param basePath base path of the table.
+ * @param writeStat {@link HoodieWriteStat} of interest.
+ * @param writesFileIdEncoding fileID encoding for the table.
+ * @param instantTime instant time of interest.
+ * @param storage instance of {@link HoodieStorage}.
+ * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+ * @throws IOException
+ */
+ public static Iterator<HoodieRecord>
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
+
HoodieWriteStat writeStat,
+
Integer writesFileIdEncoding,
+
String instantTime,
+
HoodieStorage storage) throws IOException {
+ String partition = writeStat.getPartitionPath();
+ String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath());
+ String previousFileName = writeStat.getPrevBaseFile();
+ String fileId = FSUtils.getFileId(latestFileName);
+ Set<String> recordKeysFromLatestBaseFile =
getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName);
+ if (previousFileName == null) {
+ return recordKeysFromLatestBaseFile.stream().map(recordKey ->
(HoodieRecord)HoodieMetadataPayload.createRecordIndexUpdate(recordKey,
partition, fileId,
+ instantTime, writesFileIdEncoding)).collect(toList()).iterator();
+ } else {
+ // read from previous base file and find difference to also generate
delete records.
+ // we will return new inserts and deletes from this code block
+ Set<String> recordKeysFromPreviousBaseFile =
getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName);
+ List<HoodieRecord> toReturn = recordKeysFromPreviousBaseFile.stream()
+ .filter(recordKey -> {
+ // deleted record
+ return !recordKeysFromLatestBaseFile.contains(recordKey);
+ }).map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList());
+
+ toReturn.addAll(recordKeysFromLatestBaseFile.stream()
+ .filter(recordKey -> {
+ // new inserts
+ return !recordKeysFromPreviousBaseFile.contains(recordKey);
+ }).map(recordKey ->
+ HoodieMetadataPayload.createRecordIndexUpdate(recordKey,
partition, fileId,
+ instantTime, writesFileIdEncoding)).collect(toList()));
+ return toReturn.iterator();
+ }
+ }
+
+ public static List<String> getRecordKeysDeletedOrUpdated(String basePath,
Review Comment:
method with same name in HoodieTableMetadataUtil -
getRecordKeysDeletedOrUpdated
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName);
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
Review Comment:
same.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
Review Comment:
we cannot assume base is parquet. need to check all supported file types
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -1080,12 +1080,11 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
)
verifyQueryPredicate(hudiOpts, "not_record_key_col")
- // update the secondary key column by insert.
- spark.sql(s"insert into $tableName values (5, 'row2', 'efg', 'p2')")
+ // update the secondary key column by update.
+ spark.sql(s"update $tableName set not_record_key_col = 'efg' where
record_key_col = 'row2'")
confirmLastCommitType(ActionType.replacecommit)
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
- Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
Review Comment:
this is not present now, due to handling of deletes differently?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -370,9 +381,9 @@ public void setPath(StoragePath basePath, StoragePath path)
{
@Override
public String toString() {
return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path +
'\'' + ", prevCommit='" + prevCommit
- + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes +
", numUpdateWrites=" + numUpdateWrites
- + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" +
totalWriteErrors + ", tempPath='" + tempPath
- + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats)
+ + '\'' + ", prevBaseFile=" + prevBaseFile + '\'' + ", numWrites=" +
numWrites + ", numDeletes=" + numDeletes
Review Comment:
will reformat / clean this up.. Looks too busy
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+ /**
+ * Generates RLI Metadata records for base files.
+ * If base file is a added to a new file group, all record keys are treated
as inserts.
+ * If a base file is added to an existing file group, we read previous base
file in addition to the latest base file of interest. Find deleted records and
generate RLI Metadata records
+ * for the same in addition to new insert records.
+ * @param basePath base path of the table.
+ * @param writeStat {@link HoodieWriteStat} of interest.
+ * @param writesFileIdEncoding fileID encoding for the table.
+ * @param instantTime instant time of interest.
+ * @param storage instance of {@link HoodieStorage}.
+ * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+ * @throws IOException
+ */
+ public static Iterator<HoodieRecord>
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
Review Comment:
we should get into the habit of creating clean classes.. for core
functionality. else, these "helper" methods just grow and grow..
Will clean this up
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1667,51 +1660,6 @@ private void
fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
}
}
- /**
- * Return records that represent upserts to the record index due to write
operation on the dataset.
- *
- * @param writeStatuses {@code WriteStatus} from the write operation
- */
- private HoodieData<HoodieRecord>
getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
Review Comment:
need to confirm this was all just moved somewhere else
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -95,6 +103,14 @@ public interface LogRecordScannerCallback {
void apply(HoodieRecord<?> record) throws Exception;
}
+ /**
+ * A callback for log record scanner to consume deleted HoodieKeys.
+ */
+ @FunctionalInterface
+ public interface DeletionCallback {
Review Comment:
ok. this is a new one, same name as LogFormatWriter, where its used for log
files being deleted.
rename: `RecordDeletionCallback`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -173,6 +190,11 @@ public Builder
withLogRecordScannerCallback(LogRecordScannerCallback callback) {
return this;
}
+ public Builder withLogRecordScannerCallbackForDeletedKeys(DeletionCallback
deletionCallback) {
Review Comment:
name
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile baseFileTo
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+ writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
Review Comment:
why just store the base file. for e.g if merge handle was called during
compaction, dont we need the entire slice.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+ /**
+ * Generates RLI Metadata records for base files.
+ * If base file is a added to a new file group, all record keys are treated
as inserts.
+ * If a base file is added to an existing file group, we read previous base
file in addition to the latest base file of interest. Find deleted records and
generate RLI Metadata records
+ * for the same in addition to new insert records.
+ * @param basePath base path of the table.
+ * @param writeStat {@link HoodieWriteStat} of interest.
+ * @param writesFileIdEncoding fileID encoding for the table.
+ * @param instantTime instant time of interest.
+ * @param storage instance of {@link HoodieStorage}.
+ * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+ * @throws IOException
+ */
+ public static Iterator<HoodieRecord>
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
+
HoodieWriteStat writeStat,
+
Integer writesFileIdEncoding,
Review Comment:
there is no enum for this?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
Review Comment:
what if there are two delete records.. lets say no precombine during write?
think we need to handle false, false.. and true, true differently.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName);
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFiles) {
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat,
storage).iterator();
+ } else {
+ // for logs, every entry is either an update or a delete
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ return getRecordKeys(fullFilePath.toString(),
dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize,
instantTime).iterator();
+ }
+ }).collectAsList();
Review Comment:
this is collecting sth record level. will need to check for OOM
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
Review Comment:
we need UT `getDeletedRecordKeys`
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
Review Comment:
good catch
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1311,6 +1447,53 @@ public static
List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRangeM
return Collections.emptyList();
}
+ @VisibleForTesting
+ public static Set<String> getDeletedRecordKeys(String filePath,
HoodieTableMetaClient datasetMetaClient,
+ Option<Schema>
writerSchemaOpt, int maxBufferSize,
+ String latestCommitTimestamp)
throws IOException {
+ if (writerSchemaOpt.isPresent()) {
+ // read log file records without merging
+ Set<String> deletedKeys = new HashSet<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(datasetMetaClient.getStorage())
+ .withBasePath(datasetMetaClient.getBasePath())
+ .withLogFilePaths(Collections.singletonList(filePath))
+ .withBufferSize(maxBufferSize)
+ .withLatestInstantTime(latestCommitTimestamp)
+ .withReaderSchema(writerSchemaOpt.get())
+ .withTableMetaClient(datasetMetaClient)
+ .withLogRecordScannerCallbackForDeletedKeys(deletedKey ->
deletedKeys.add(deletedKey.getRecordKey()))
+ .build();
+ scanner.scan();
+ return deletedKeys;
+ }
+ return Collections.emptySet();
+ }
+
+ @VisibleForTesting
+ public static Set<String> getRecordKeys(String filePath,
HoodieTableMetaClient datasetMetaClient,
+ Option<Schema>
writerSchemaOpt, int maxBufferSize,
+ String latestCommitTimestamp)
throws IOException {
+ if (writerSchemaOpt.isPresent()) {
Review Comment:
code duplication
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
Review Comment:
code duplication
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile baseFileTo
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+ writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
Review Comment:
and do other handles set this. - yes? no>? why?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName);
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFiles) {
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat,
storage).iterator();
+ } else {
+ // for logs, every entry is either an update or a delete
Review Comment:
do we throw errors if we find numInserts > 0 for logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]