[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185997301


##
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java:
##
@@ -149,6 +154,59 @@ public void testResizing(boolean isSplit) throws 
IOException {
 });
   }
 
+  /***

Review Comment:
   ```suggestion
 /**
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996892


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+  try {
+if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+  createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+  }
+});
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table
+   * @param fileStatus
+   * @param partitionPath
+   * @throws IOException
+   */
+  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path fullPath = new Path(partitionPath, 
getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+if (fs.exists(fullPath)) {
+  return;
+}
+String metadata = "";
+FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+  }
+
+  /***
+   * Load consistent hashing metadata from given file
+   * @param table
+   * @param metaFile
+   * @return
+   */
+  private static Option 
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
+try {
+  if (metaFile == null) {
+return Option.empty();
+  }
+  byte[] content = 
FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
+  return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
+} catch (FileNotFoundException e) {
+  return Option.empty();
+} catch (IOException e) {
+  LOG.error("Error when loading hashing metadata, for path: " + 
metaFile.getPath().getName(), e);
+  throw new HoodieIndexException("Error while loading hashing metadata", 
e);
+}
+  }
+
+  /***
+   * COMMIT MARKER RECOVERY JOB.
+   * If particular hashing metadta file doesn't have commit marker then there 
could be a case where clustering is done but post commit marker
+   * creation operation failed. In this case this method will check file group 
id from consistent hashing metadata against storage base file group ids.
+   * if one of the file group matches then we can conclude that this is the 
latest metadata file.
+   * Note : we will end up calling this method if there is no marker file and 
no replace commit on active timeline, if replace commit is not present on
+   * active timeline that means old file group id's before clustering 
operation got cleaned and only new file group id's  of current clustering 
operation
+   * are present on the disk.
+   * @param table

Review Comment:
   Adds the comment of all parameters and return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:

[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996438


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+  try {
+if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+  createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+  }
+});
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table
+   * @param fileStatus
+   * @param partitionPath
+   * @throws IOException
+   */
+  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path fullPath = new Path(partitionPath, 
getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+if (fs.exists(fullPath)) {
+  return;
+}
+String metadata = "";
+FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+  }
+
+  /***
+   * Load consistent hashing metadata from given file
+   * @param table

Review Comment:
   Adds the comment of all parameters and return.



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, 

[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996328


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+  try {
+if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+  createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+  }
+});
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table
+   * @param fileStatus
+   * @param partitionPath
+   * @throws IOException
+   */
+  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path fullPath = new Path(partitionPath, 
getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+if (fs.exists(fullPath)) {
+  return;
+}
+String metadata = "";
+FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+  }
+
+  /***
+   * Load consistent hashing metadata from given file

Review Comment:
   ```suggestion
  * Loads consistent hashing metadata of table from the given meta file.
   ```



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = 

[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185995971


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+  try {
+if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+  createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+  }
+});
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table
+   * @param fileStatus
+   * @param partitionPath
+   * @throws IOException
+   */
+  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path fullPath = new Path(partitionPath, 
getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+if (fs.exists(fullPath)) {
+  return;
+}
+String metadata = "";
+FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));

Review Comment:
   ```suggestion
   FileIOUtils.createFileInPath(fs, fullPath, 
Option.of(StringUtils.EMPTY_STRING.getBytes()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185995107


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+  try {
+if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+  createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+  }
+});
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table

Review Comment:
   Adds the comment of all parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185994632


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -271,8 +327,114 @@ public Option 
getRecordLocation(HoodieKey key) {
   }
 
   LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+  + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+Option> instantPlanPair =
+ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+if (!instantPlanPair.isPresent()) {
+  return;
+}
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+  Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+  try {
+if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+  createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+  }
+});
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation

Review Comment:
   ```suggestion
  * Creates commit marker corresponding to hashing metadata file after post 
commit clustering operation.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185994125


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java:
##
@@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType 
operationType) {
   public void close() {
   }
 
+  /***
+   * Update index metadata
+   * @param table
+   * @param hoodieInstant

Review Comment:
   ```suggestion
  * @param hoodieInstant The instant to commit.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185993420


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java:
##
@@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType 
operationType) {
   public void close() {
   }
 
+  /***
+   * Update index metadata

Review Comment:
   ```suggestion
  * Updates index metadata of the given table and instant if needed.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185993910


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java:
##
@@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType 
operationType) {
   public void close() {
   }
 
+  /***
+   * Update index metadata
+   * @param table

Review Comment:
   ```suggestion
  * @param table The committed table.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185993420


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java:
##
@@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType 
operationType) {
   public void close() {
   }
 
+  /***
+   * Update index metadata

Review Comment:
   ```suggestion
  * Update index metadata of the given table and instant if needed.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-05-05 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185992732


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -513,7 +513,6 @@ private Stream getInstantsToArchive() throws 
IOException {
 if (config.isMetaserverEnabled()) {
   return Stream.empty();
 }
-

Review Comment:
   No any update. Please revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-28 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1180044369


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**
+ * if hoodieOldestInstantToArchive is null that means nothing is getting 
archived, so no need to update metadata
+ */
+if (hoodieOldestInstantToArchive != null) {
+  table.getIndex().updateMetadata(table, 
Option.of(hoodieOldestInstantToArchive));

Review Comment:
   @rohan-uptycs, make senses to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-27 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1179114163


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**
+ * if hoodieOldestInstantToArchive is null that means nothing is getting 
archived, so no need to update metadata
+ */
+if (hoodieOldestInstantToArchive != null) {
+  table.getIndex().updateMetadata(table, 
Option.of(hoodieOldestInstantToArchive));

Review Comment:
   @rohan-uptycs, when `postCommit` executes successfully, `updateMetadata` 
could invoke.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-26 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1178579855


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**
+ * if hoodieOldestInstantToArchive is null that means nothing is getting 
archived, so no need to update metadata
+ */
+if (hoodieOldestInstantToArchive != null) {
+  table.getIndex().updateMetadata(table, 
Option.of(hoodieOldestInstantToArchive));

Review Comment:
   @rohan-uptycs, make sense to me from interface point of view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-26 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1177521748


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**
+ * if hoodieOldestInstantToArchive is null that means nothing is getting 
archived, so no need to update metadata
+ */
+if (hoodieOldestInstantToArchive != null) {
+  table.getIndex().updateMetadata(table, 
Option.of(hoodieOldestInstantToArchive));

Review Comment:
   @rohan-uptycs, `getInstantsToArchive` is also used to get instants to 
archive, therefore the update behavior couldn't invoke in this method for 
design.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-26 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1177385692


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**

Review Comment:
   Removes line 515 and 517.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-26 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1177385296


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**
+ * if hoodieOldestInstantToArchive is null that means nothing is getting 
archived, so no need to update metadata
+ */
+if (hoodieOldestInstantToArchive != null) {
+  table.getIndex().updateMetadata(table, 
Option.of(hoodieOldestInstantToArchive));

Review Comment:
   I don't think invoking `updateMetadata` in `getInstantsToArchive` make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-26 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1177384033


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   }
 
   private Stream getInstantsToArchive() throws IOException {
-Stream instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+List commitInstantsToArchive = 
getCommitInstantsToArchive().collect(Collectors.toList());
+Stream instants = 
Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream());
+HoodieInstant hoodieOldestInstantToArchive = 
commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> 
maxInstant.getTimestamp())).orElse(null);
+/**
+ * if hoodieOldestInstantToArchive is null that means nothing is getting 
archived, so no need to update metadata

Review Comment:
   ```suggestion
// if hoodieOldestInstantToArchive is null that means nothing is 
getting archived, so no need to update metadata
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175964368


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##
@@ -441,6 +441,8 @@ private Stream getCommitInstantsToArchive() 
throws IOException {
   Option oldestInstantToRetainForClustering =
   
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
 
+  table.getIndex().updateMetadata(table);
+

Review Comment:
   @rohan-uptycs, could the `updateMetadata` be invoked after archive or the 
caller of `getCommitInstantsToArchive`? IMO, `getCommitInstantsToArchive` only 
gets the commit instants to archive, no any update behavior. Therefore, the 
`updateMetadata` should not trigger in `getCommitInstantsToArchive`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175229521


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +279,65 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+Option hoodieOldestReplaceInstantToKeep = 
getOldestInstantToRetain(table);
+// Update metadata for replace commit which are going to get archived.
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant ->
+hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
replaceInstantToKeep.getTimestamp())).orElse(true));
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =
+  ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp()));
+  if (instantPlanPair.isPresent()) {
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  if (!partitionVisiteddMap.containsKey(partition)) {
+Option 
hoodieConsistentHashingMetadataOption = loadMetadata(table, partition);
+if (hoodieConsistentHashingMetadataOption.isPresent()) {
+  try {
+overWriteMetadata(table, 
hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + 
HASHING_METADATA_FILE_SUFFIX);
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+partitionVisiteddMap.put(partition, Boolean.TRUE);
+  }
+});
+  }
+});
+  }
+
+  private Option getOldestInstantToRetain(HoodieTable table) {
+try {
+  Option oldestInstantToRetainForClustering =
+  
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+  return oldestInstantToRetainForClustering;
+} catch (IOException e) {
+  LOG.error("Error while getting oldest instant to retain info: ", e);
+  return Option.empty();
+}
+  }
+
+  private boolean overwriteMetadata(HoodieTable table, 
HoodieConsistentHashingMetadata metadata, String fileName) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path dir = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
metadata.getPartitionPath());
+Path fullPath = new Path(dir, fileName);
+try (FSDataOutputStream fsOut = fs.create(fullPath, true)) {
+   byte[] bytes = metadata.toBytes();
+   fsOut.write(bytes);
+}
+byte[] bytes = metadata.toBytes();

Review Comment:
   You need remove line 338~340.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175224217


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +279,62 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+Option hoodieOldestReplaceInstantToKeep = 
getOldestInstantToRetain(table);
+// Update metadata for replace commit which are going to get archived.
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant ->
+hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
replaceInstantToKeep.getTimestamp())).orElse(true));
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =
+  ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp()));
+  if (instantPlanPair.isPresent()) {
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  if (!partitionVisiteddMap.containsKey(partition)) {
+Option 
hoodieConsistentHashingMetadataOption = loadMetadata(table, partition);
+if (hoodieConsistentHashingMetadataOption.isPresent()) {
+  try {
+overWriteMetadata(table, 
hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + 
HASHING_METADATA_FILE_SUFFIX);
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+partitionVisiteddMap.put(partition, Boolean.TRUE);
+  }
+});
+  }
+});
+  }
+
+  private Option getOldestInstantToRetain(HoodieTable table) {
+try {
+  Option oldestInstantToRetainForClustering =
+  
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+  return oldestInstantToRetainForClustering;
+} catch (IOException e) {
+  LOG.error("Error while getting oldest instant to retain info: ", e);
+  return Option.empty();
+}
+  }
+
+  private boolean overWriteMetadata(HoodieTable table, 
HoodieConsistentHashingMetadata metadata, String fileName) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path dir = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
metadata.getPartitionPath());
+Path fullPath = new Path(dir, fileName);
+FSDataOutputStream fsOut = fs.create(fullPath, true);
+byte[] bytes = metadata.toBytes();
+fsOut.write(bytes);
+fsOut.close();

Review Comment:
   After above suggestion, `fsOut.close()` would invoke in finally block 
dynamic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175223280


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +279,62 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+Option hoodieOldestReplaceInstantToKeep = 
getOldestInstantToRetain(table);
+// Update metadata for replace commit which are going to get archived.
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant ->
+hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
replaceInstantToKeep.getTimestamp())).orElse(true));
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =
+  ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp()));
+  if (instantPlanPair.isPresent()) {
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  if (!partitionVisiteddMap.containsKey(partition)) {
+Option 
hoodieConsistentHashingMetadataOption = loadMetadata(table, partition);
+if (hoodieConsistentHashingMetadataOption.isPresent()) {
+  try {
+overWriteMetadata(table, 
hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + 
HASHING_METADATA_FILE_SUFFIX);
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+partitionVisiteddMap.put(partition, Boolean.TRUE);
+  }
+});
+  }
+});
+  }
+
+  private Option getOldestInstantToRetain(HoodieTable table) {
+try {
+  Option oldestInstantToRetainForClustering =
+  
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+  return oldestInstantToRetainForClustering;
+} catch (IOException e) {
+  LOG.error("Error while getting oldest instant to retain info: ", e);
+  return Option.empty();
+}
+  }
+
+  private boolean overWriteMetadata(HoodieTable table, 
HoodieConsistentHashingMetadata metadata, String fileName) throws IOException {
+HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+Path dir = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
metadata.getPartitionPath());
+Path fullPath = new Path(dir, fileName);
+FSDataOutputStream fsOut = fs.create(fullPath, true);

Review Comment:
   ```suggestion
   try (FSDataOutputStream fsOut = fs.create(fullPath, true)) {
  byte[] bytes = metadata.toBytes();
  fsOut.write(bytes);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175220178


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +279,62 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+Option hoodieOldestReplaceInstantToKeep = 
getOldestInstantToRetain(table);
+// Update metadata for replace commit which are going to get archived.
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant ->
+hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
replaceInstantToKeep.getTimestamp())).orElse(true));
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =
+  ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp()));
+  if (instantPlanPair.isPresent()) {
+HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+List> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+partitionMapList.stream().forEach(partitionMap -> {
+  String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+  if (!partitionVisiteddMap.containsKey(partition)) {
+Option 
hoodieConsistentHashingMetadataOption = loadMetadata(table, partition);
+if (hoodieConsistentHashingMetadataOption.isPresent()) {
+  try {
+overWriteMetadata(table, 
hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + 
HASHING_METADATA_FILE_SUFFIX);
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+partitionVisiteddMap.put(partition, Boolean.TRUE);
+  }
+});
+  }
+});
+  }
+
+  private Option getOldestInstantToRetain(HoodieTable table) {
+try {
+  Option oldestInstantToRetainForClustering =
+  
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+  return oldestInstantToRetainForClustering;
+} catch (IOException e) {
+  LOG.error("Error while getting oldest instant to retain info: ", e);
+  return Option.empty();
+}
+  }
+
+  private boolean overWriteMetadata(HoodieTable table, 
HoodieConsistentHashingMetadata metadata, String fileName) throws IOException {

Review Comment:
   ```suggestion
 private boolean overwriteMetadata(HoodieTable table, 
HoodieConsistentHashingMetadata metadata, String fileName) throws IOException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175216695


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +279,62 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+Option hoodieOldestReplaceInstantToKeep = 
getOldestInstantToRetain(table);

Review Comment:
   Could the oldest instant to retain get from interface parameter? Otherwise 
the oldest instant gets twice and consumes some performance here and will be 
inconsistent between `hoodieOldestReplaceInstantToKeep ` and 
`oldestInstantToRetainForClustering` in `getCommitInstantsToArchive` when the 
timeline changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175216695


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +279,62 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+Option hoodieOldestReplaceInstantToKeep = 
getOldestInstantToRetain(table);

Review Comment:
   Could the oldest instant to retain get from interface parameter? Otherwise 
the oldest instant gets twice and consumes some performance here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1175030462


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +278,46 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline();
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =

Review Comment:
   @rohan-uptycs, another question is that in `HoodieTimelineArchiver`, the 
replacecommits after `oldestInstantToRetainForClustering` aren't archived, but 
the replacecommits before `oldestInstantToRetainForClustering` would be 
archived. Therefore does all completed replacecommits need to update metadata? 
Or only unarchived replacecommits need to be updated metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1174954973


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +278,46 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline();
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =

Review Comment:
   @rohan-uptycs, BTW, does the replacecommit in archived timeline need to 
update metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-24 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1174878383


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +278,46 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline();
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =

Review Comment:
   @rohan-uptycs, the `getCompletedReplaceTimeline()` returns the completed 
replacecommit, which could generate from clustering operation and insert 
overwrite action. No problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data

2023-04-23 Thread via GitHub


SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1174772541


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##
@@ -275,4 +278,46 @@ public Option 
getRecordLocation(HoodieKey key) {
   throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
 }
   }
+
+  /**
+   * Update default metadata file(00.hashing_meta) with the latest 
committed metadata file so that default file will be in sync
+   * with latest commit.
+   *
+   * @param table
+   */
+  public void updateMetadata(HoodieTable table) {
+Map partitionVisiteddMap = new HashMap<>();
+HoodieTimeline hoodieTimeline = 
table.getActiveTimeline().getCompletedReplaceTimeline();
+hoodieTimeline.getInstants().forEach(instant -> {
+  Option> instantPlanPair =

Review Comment:
   When replacecommit comes from `INSERT OVERWRITE` action, is there any 
problem here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org