[GitHub] [hudi] yihua commented on a diff in pull request #9437: [HUDI-6689] Add record index validation in MDT validator
yihua commented on code in PR #9437: URL: https://github.com/apache/hudi/pull/9437#discussion_r1294125375 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java: ## @@ -741,6 +791,116 @@ private void validateBloomFilters( validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters"); } + private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext, + HoodieTableMetaClient metaClient, + HoodieTableMetadata tableMetadata) { +if (cfg.validateRecordIndexContent) { + validateRecordIndexContent(sparkEngineContext, metaClient, tableMetadata); +} else if (cfg.validateRecordIndexCount) { + validateRecordIndexCount(sparkEngineContext, metaClient); +} + } + + private void validateRecordIndexCount(HoodieSparkEngineContext sparkEngineContext, +HoodieTableMetaClient metaClient) { +String basePath = metaClient.getBasePathV2().toString(); +long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi") +.load(basePath) +.select(RECORD_KEY_METADATA_FIELD) +.distinct() +.count(); +long countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi") +.load(getMetadataTableBasePath(basePath)) +.select("key") +.filter("type = 5") +.distinct() Review Comment: The `distinct()` operation is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] yihua commented on a diff in pull request #9437: [HUDI-6689] Add record index validation in MDT validator
yihua commented on code in PR #9437: URL: https://github.com/apache/hudi/pull/9437#discussion_r1294125576 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java: ## @@ -741,6 +791,116 @@ private void validateBloomFilters( validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters"); } + private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext, + HoodieTableMetaClient metaClient, + HoodieTableMetadata tableMetadata) { +if (cfg.validateRecordIndexContent) { + validateRecordIndexContent(sparkEngineContext, metaClient, tableMetadata); +} else if (cfg.validateRecordIndexCount) { + validateRecordIndexCount(sparkEngineContext, metaClient); +} + } + + private void validateRecordIndexCount(HoodieSparkEngineContext sparkEngineContext, +HoodieTableMetaClient metaClient) { +String basePath = metaClient.getBasePathV2().toString(); +long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi") +.load(basePath) +.select(RECORD_KEY_METADATA_FIELD) +.distinct() +.count(); +long countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi") +.load(getMetadataTableBasePath(basePath)) +.select("key") +.filter("type = 5") +.distinct() +.count(); + +if (countKeyFromTable != countKeyFromRecordIndex) { + String message = String.format("Validation of record index count failed: " + + "%s entries from record index metadata, %s keys from the data table.", + countKeyFromRecordIndex, countKeyFromTable); + LOG.error(message); + throw new HoodieValidationException(message); +} else { + LOG.info(String.format( + "Validation of record index count succeeded: %s entries.", countKeyFromRecordIndex)); +} + } + + private void validateRecordIndexContent(HoodieSparkEngineContext sparkEngineContext, + HoodieTableMetaClient metaClient, + HoodieTableMetadata tableMetadata) { +String basePath = metaClient.getBasePathV2().toString(); +JavaPairRDD> keyToLocationOnFsRdd = +sparkEngineContext.getSqlContext().read().format("hudi").load(basePath) +.select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD) +.toJavaRDD() +.mapToPair(row -> new Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)), + Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)), + FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD)) +.cache(); + +JavaPairRDD> keyToLocationFromRecordIndexRdd = +sparkEngineContext.getSqlContext().read().format("hudi") +.load(getMetadataTableBasePath(basePath)) +.filter("type = 5") +.select(functions.col("key"), + functions.col("recordIndexMetadata.partitionName").as("partitionName"), + functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"), + functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"), +functions.col("recordIndexMetadata.fileIndex").as("fileIndex"), +functions.col("recordIndexMetadata.fileId").as("fileId"), + functions.col("recordIndexMetadata.instantTime").as("instantTime"), + functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding")) +.toJavaRDD() +.mapToPair(row -> { + HoodieRecordGlobalLocation location = HoodieTableMetadataUtil.getLocationFromRecordIndexInfo( + row.getString(row.fieldIndex("partitionName")), + row.getInt(row.fieldIndex("fileIdEncoding")), + row.getLong(row.fieldIndex("fileIdHighBits")), + row.getLong(row.fieldIndex("fileIdLowBits")), + row.getInt(row.fieldIndex("fileIndex")), + row.getString(row.fieldIndex("fileId")), + row.getLong(row.fieldIndex("instantTime"))); + return new Tuple2<>(row.getString(row.fieldIndex("key")), + Pair.of(location.getPartitionPath(), location.getFileId())); +}); + +long diffCount = keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, cfg.recordIndexParallelism) +.map(e -> { + Optional> locationOnFs = e._2._1; + Optional> locationFromRecordIndex = e._2._2; + if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) { +if (locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft()) +
[GitHub] [hudi] yihua commented on a diff in pull request #9437: [HUDI-6689] Add record index validation in MDT validator
yihua commented on code in PR #9437: URL: https://github.com/apache/hudi/pull/9437#discussion_r1294049901 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java: ## @@ -741,6 +791,116 @@ private void validateBloomFilters( validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters"); } + private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext, + HoodieTableMetaClient metaClient, + HoodieTableMetadata tableMetadata) { +if (cfg.validateRecordIndexContent) { + validateRecordIndexContent(sparkEngineContext, metaClient, tableMetadata); +} else if (cfg.validateRecordIndexCount) { + validateRecordIndexCount(sparkEngineContext, metaClient); +} + } + + private void validateRecordIndexCount(HoodieSparkEngineContext sparkEngineContext, +HoodieTableMetaClient metaClient) { +String basePath = metaClient.getBasePathV2().toString(); +long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi") +.load(basePath) +.select(RECORD_KEY_METADATA_FIELD) +.distinct() +.count(); +long countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi") +.load(getMetadataTableBasePath(basePath)) +.select("key") +.filter("type = 5") +.distinct() +.count(); + +if (countKeyFromTable != countKeyFromRecordIndex) { + String message = String.format("Validation of record index count failed: " + + "%s entries from record index metadata, %s keys from the data table.", + countKeyFromRecordIndex, countKeyFromTable); + LOG.error(message); + throw new HoodieValidationException(message); +} else { + LOG.info(String.format( + "Validation of record index count succeeded: %s entries.", countKeyFromRecordIndex)); +} + } + + private void validateRecordIndexContent(HoodieSparkEngineContext sparkEngineContext, + HoodieTableMetaClient metaClient, + HoodieTableMetadata tableMetadata) { +String basePath = metaClient.getBasePathV2().toString(); +JavaPairRDD> keyToLocationOnFsRdd = +sparkEngineContext.getSqlContext().read().format("hudi").load(basePath) +.select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD) +.toJavaRDD() +.mapToPair(row -> new Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)), + Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)), + FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD)) +.cache(); + +JavaPairRDD> keyToLocationFromRecordIndexRdd = +sparkEngineContext.getSqlContext().read().format("hudi") +.load(getMetadataTableBasePath(basePath)) +.filter("type = 5") +.select(functions.col("key"), + functions.col("recordIndexMetadata.partitionName").as("partitionName"), + functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"), + functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"), +functions.col("recordIndexMetadata.fileIndex").as("fileIndex"), +functions.col("recordIndexMetadata.fileId").as("fileId"), + functions.col("recordIndexMetadata.instantTime").as("instantTime"), + functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding")) +.toJavaRDD() +.mapToPair(row -> { + HoodieRecordGlobalLocation location = HoodieTableMetadataUtil.getLocationFromRecordIndexInfo( + row.getString(row.fieldIndex("partitionName")), + row.getInt(row.fieldIndex("fileIdEncoding")), + row.getLong(row.fieldIndex("fileIdHighBits")), + row.getLong(row.fieldIndex("fileIdLowBits")), + row.getInt(row.fieldIndex("fileIndex")), + row.getString(row.fieldIndex("fileId")), + row.getLong(row.fieldIndex("instantTime"))); + return new Tuple2<>(row.getString(row.fieldIndex("key")), + Pair.of(location.getPartitionPath(), location.getFileId())); +}); + +long diffCount = keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, cfg.recordIndexParallelism) +.map(e -> { + Optional> locationOnFs = e._2._1; + Optional> locationFromRecordIndex = e._2._2; + if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) { +if (locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft()) +