[GitHub] [hudi] yihua commented on a diff in pull request #9437: [HUDI-6689] Add record index validation in MDT validator

2023-08-14 Thread via GitHub


yihua commented on code in PR #9437:
URL: https://github.com/apache/hudi/pull/9437#discussion_r1294125375


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##
@@ -741,6 +791,116 @@ private void validateBloomFilters(
 validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
+  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+   HoodieTableMetaClient metaClient,
+   HoodieTableMetadata tableMetadata) {
+if (cfg.validateRecordIndexContent) {
+  validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+} else if (cfg.validateRecordIndexCount) {
+  validateRecordIndexCount(sparkEngineContext, metaClient);
+}
+  }
+
+  private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
+HoodieTableMetaClient metaClient) {
+String basePath = metaClient.getBasePathV2().toString();
+long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+.load(basePath)
+.select(RECORD_KEY_METADATA_FIELD)
+.distinct()
+.count();
+long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+.load(getMetadataTableBasePath(basePath))
+.select("key")
+.filter("type = 5")
+.distinct()

Review Comment:
   The `distinct()` operation is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #9437: [HUDI-6689] Add record index validation in MDT validator

2023-08-14 Thread via GitHub


yihua commented on code in PR #9437:
URL: https://github.com/apache/hudi/pull/9437#discussion_r1294125576


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##
@@ -741,6 +791,116 @@ private void validateBloomFilters(
 validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
+  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+   HoodieTableMetaClient metaClient,
+   HoodieTableMetadata tableMetadata) {
+if (cfg.validateRecordIndexContent) {
+  validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+} else if (cfg.validateRecordIndexCount) {
+  validateRecordIndexCount(sparkEngineContext, metaClient);
+}
+  }
+
+  private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
+HoodieTableMetaClient metaClient) {
+String basePath = metaClient.getBasePathV2().toString();
+long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+.load(basePath)
+.select(RECORD_KEY_METADATA_FIELD)
+.distinct()
+.count();
+long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+.load(getMetadataTableBasePath(basePath))
+.select("key")
+.filter("type = 5")
+.distinct()
+.count();
+
+if (countKeyFromTable != countKeyFromRecordIndex) {
+  String message = String.format("Validation of record index count failed: 
"
+  + "%s entries from record index metadata, %s keys from the data 
table.",
+  countKeyFromRecordIndex, countKeyFromTable);
+  LOG.error(message);
+  throw new HoodieValidationException(message);
+} else {
+  LOG.info(String.format(
+  "Validation of record index count succeeded: %s entries.", 
countKeyFromRecordIndex));
+}
+  }
+
+  private void validateRecordIndexContent(HoodieSparkEngineContext 
sparkEngineContext,
+  HoodieTableMetaClient metaClient,
+  HoodieTableMetadata tableMetadata) {
+String basePath = metaClient.getBasePathV2().toString();
+JavaPairRDD> keyToLocationOnFsRdd =
+sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
+.select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, 
FILENAME_METADATA_FIELD)
+.toJavaRDD()
+.mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))
+.cache();
+
+JavaPairRDD> keyToLocationFromRecordIndexRdd =
+sparkEngineContext.getSqlContext().read().format("hudi")
+.load(getMetadataTableBasePath(basePath))
+.filter("type = 5")
+.select(functions.col("key"),
+
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+functions.col("recordIndexMetadata.fileId").as("fileId"),
+
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+.toJavaRDD()
+.mapToPair(row -> {
+  HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+  row.getString(row.fieldIndex("partitionName")),
+  row.getInt(row.fieldIndex("fileIdEncoding")),
+  row.getLong(row.fieldIndex("fileIdHighBits")),
+  row.getLong(row.fieldIndex("fileIdLowBits")),
+  row.getInt(row.fieldIndex("fileIndex")),
+  row.getString(row.fieldIndex("fileId")),
+  row.getLong(row.fieldIndex("instantTime")));
+  return new Tuple2<>(row.getString(row.fieldIndex("key")),
+  Pair.of(location.getPartitionPath(), location.getFileId()));
+});
+
+long diffCount = 
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, 
cfg.recordIndexParallelism)
+.map(e -> {
+  Optional> locationOnFs = e._2._1;
+  Optional> locationFromRecordIndex = e._2._2;
+  if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) 
{
+if 
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
+   

[GitHub] [hudi] yihua commented on a diff in pull request #9437: [HUDI-6689] Add record index validation in MDT validator

2023-08-14 Thread via GitHub


yihua commented on code in PR #9437:
URL: https://github.com/apache/hudi/pull/9437#discussion_r1294049901


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##
@@ -741,6 +791,116 @@ private void validateBloomFilters(
 validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
+  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+   HoodieTableMetaClient metaClient,
+   HoodieTableMetadata tableMetadata) {
+if (cfg.validateRecordIndexContent) {
+  validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+} else if (cfg.validateRecordIndexCount) {
+  validateRecordIndexCount(sparkEngineContext, metaClient);
+}
+  }
+
+  private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
+HoodieTableMetaClient metaClient) {
+String basePath = metaClient.getBasePathV2().toString();
+long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+.load(basePath)
+.select(RECORD_KEY_METADATA_FIELD)
+.distinct()
+.count();
+long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+.load(getMetadataTableBasePath(basePath))
+.select("key")
+.filter("type = 5")
+.distinct()
+.count();
+
+if (countKeyFromTable != countKeyFromRecordIndex) {
+  String message = String.format("Validation of record index count failed: 
"
+  + "%s entries from record index metadata, %s keys from the data 
table.",
+  countKeyFromRecordIndex, countKeyFromTable);
+  LOG.error(message);
+  throw new HoodieValidationException(message);
+} else {
+  LOG.info(String.format(
+  "Validation of record index count succeeded: %s entries.", 
countKeyFromRecordIndex));
+}
+  }
+
+  private void validateRecordIndexContent(HoodieSparkEngineContext 
sparkEngineContext,
+  HoodieTableMetaClient metaClient,
+  HoodieTableMetadata tableMetadata) {
+String basePath = metaClient.getBasePathV2().toString();
+JavaPairRDD> keyToLocationOnFsRdd =
+sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
+.select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, 
FILENAME_METADATA_FIELD)
+.toJavaRDD()
+.mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))
+.cache();
+
+JavaPairRDD> keyToLocationFromRecordIndexRdd =
+sparkEngineContext.getSqlContext().read().format("hudi")
+.load(getMetadataTableBasePath(basePath))
+.filter("type = 5")
+.select(functions.col("key"),
+
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+functions.col("recordIndexMetadata.fileId").as("fileId"),
+
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+.toJavaRDD()
+.mapToPair(row -> {
+  HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+  row.getString(row.fieldIndex("partitionName")),
+  row.getInt(row.fieldIndex("fileIdEncoding")),
+  row.getLong(row.fieldIndex("fileIdHighBits")),
+  row.getLong(row.fieldIndex("fileIdLowBits")),
+  row.getInt(row.fieldIndex("fileIndex")),
+  row.getString(row.fieldIndex("fileId")),
+  row.getLong(row.fieldIndex("instantTime")));
+  return new Tuple2<>(row.getString(row.fieldIndex("key")),
+  Pair.of(location.getPartitionPath(), location.getFileId()));
+});
+
+long diffCount = 
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, 
cfg.recordIndexParallelism)
+.map(e -> {
+  Optional> locationOnFs = e._2._1;
+  Optional> locationFromRecordIndex = e._2._2;
+  if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) 
{
+if 
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
+