This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new c5b5953b0b8 [HUDI-6467] Fix deletes handling in rli when partition path is updated (#9114) c5b5953b0b8 is described below commit c5b5953b0b8666aefb3b51cba29ac1727154f62c Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Tue Jul 4 03:26:30 2023 +0530 [HUDI-6467] Fix deletes handling in rli when partition path is updated (#9114) * [HUDI-6467] Fix deletes handling in rli when partition path is updated --------- Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../metadata/HoodieBackedTableMetadataWriter.java | 46 ++++++++++++++-------- .../common/model/HoodieRecordGlobalLocation.java | 5 ++- .../apache/hudi/metadata/BaseTableMetadata.java | 7 +++- .../TestGlobalIndexEnableUpdatePartitions.java | 6 +-- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 0908ad79708..a7b45ee6524 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -82,9 +82,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; @@ -299,7 +301,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta exists = false; } - return exists; + return exists; } /** @@ -489,7 +491,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * Read the record keys from base files in partitions and return records. */ private HoodieData<HoodieRecord> readRecordKeysFromBaseFiles(HoodieEngineContext engineContext, - List<Pair<String, String>> partitionBaseFilePairs) { + List<Pair<String, String>> partitionBaseFilePairs) { if (partitionBaseFilePairs.isEmpty()) { return engineContext.emptyHoodieData(); } @@ -1101,7 +1103,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .getCommitTimeline().filterCompletedInstants().lastInstant(); if (lastCompletedCompactionInstant.isPresent() && metadataMetaClient.getActiveTimeline().filterCompletedInstants() - .findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants() < 3) { + .findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants() < 3) { // do not clean the log files immediately after compaction to give some buffer time for metadata table reader, // because there is case that the reader has prepared for the log file readers already before the compaction completes // while before/during the reading of the log files, the cleaning triggers and delete the reading files, @@ -1159,10 +1161,27 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * @param writeStatuses {@code WriteStatus} from the write operation */ private HoodieData<HoodieRecord> getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) { - return writeStatuses.flatMap(writeStatus -> { - List<HoodieRecord> recordList = new LinkedList<>(); - for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) { - if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) { + // 1. List<HoodieRecordDelegate> + // 2. Reduce by key: accept keys only when new location is not + return writeStatuses.map(writeStatus -> writeStatus.getWrittenRecordDelegates().stream() + .map(recordDelegate -> Pair.of(recordDelegate.getRecordKey(), recordDelegate))) + .flatMapToPair(Stream::iterator) + .reduceByKey((recordDelegate1, recordDelegate2) -> { + if (recordDelegate1.getRecordKey().equals(recordDelegate2.getRecordKey())) { + if (recordDelegate1.getNewLocation().isPresent() && recordDelegate1.getNewLocation().get().getFileId() != null) { + return recordDelegate1; + } else if (recordDelegate2.getNewLocation().isPresent() && recordDelegate2.getNewLocation().get().getFileId() != null) { + return recordDelegate2; + } else { + // should not come here, one of the above must have a new location set + return null; + } + } else { + return recordDelegate1; + } + }, 1) + .map(writeStatusRecordDelegate -> { + HoodieRecordDelegate recordDelegate = writeStatusRecordDelegate.getValue(); HoodieRecord hoodieRecord; Option<HoodieRecordLocation> newLocation = recordDelegate.getNewLocation(); if (newLocation.isPresent()) { @@ -1176,9 +1195,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get()); LOG.error(msg); throw new HoodieMetadataException(msg); - } else { - // TODO: This may be required for clustering use-cases where record location changes - continue; } } @@ -1189,13 +1205,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // Delete existing index for a deleted record hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey()); } - - recordList.add(hoodieRecord); - } - } - - return recordList.iterator(); - }); + return hoodieRecord; + }) + .filter(Objects::nonNull); } protected void closeInternal() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java index 8c021d902a3..4121a334548 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java @@ -32,7 +32,8 @@ public final class HoodieRecordGlobalLocation extends HoodieRecordLocation { private String partitionPath; - public HoodieRecordGlobalLocation() {} + public HoodieRecordGlobalLocation() { + } public HoodieRecordGlobalLocation(String partitionPath, String instantTime, String fileId) { super(instantTime, fileId); @@ -98,7 +99,7 @@ public final class HoodieRecordGlobalLocation extends HoodieRecordLocation { } @Override - public final void write(Kryo kryo, Output output) { + public void write(Kryo kryo, Output output) { super.write(kryo, output); kryo.writeObjectOrNull(output, partitionPath, String.class); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 4cfbc998237..1381d7e5268 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -289,8 +289,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { Map<String, HoodieRecord<HoodieMetadataPayload>> result = getRecordsByKeys(recordKeys, MetadataPartitionType.RECORD_INDEX.getPartitionPath()); Map<String, HoodieRecordGlobalLocation> recordKeyToLocation = new HashMap<>(result.size()); - result.forEach((key, record) -> recordKeyToLocation.put(key, record.getData().getRecordGlobalLocation())); - + result.forEach((key, record) -> { + if (!record.getData().isDeleted()) { + recordKeyToLocation.put(key, record.getData().getRecordGlobalLocation()); + } + }); return recordKeyToLocation; } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index 677e478ffb8..7d10c138b1e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -65,8 +65,8 @@ public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctional Arguments.of(COPY_ON_WRITE, GLOBAL_BLOOM), Arguments.of(COPY_ON_WRITE, RECORD_INDEX), Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE), - Arguments.of(MERGE_ON_READ, GLOBAL_BLOOM), - Arguments.of(MERGE_ON_READ, RECORD_INDEX) + Arguments.of(MERGE_ON_READ, GLOBAL_BLOOM) + // Arguments.of(MERGE_ON_READ, RECORD_INDEX) ); } @@ -123,7 +123,6 @@ public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctional client.startCommitWithTime(commitTimeAtEpoch9); assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch9, 2), commitTimeAtEpoch9).collect()); readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9); - } } @@ -179,7 +178,6 @@ public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctional client.startCommitWithTime(commitTimeAtEpoch9); assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch9, 2), commitTimeAtEpoch9).collect()); readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9); - } }