[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185997301 ## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java: ## @@ -149,6 +154,59 @@ public void testResizing(boolean isSplit) throws IOException { }); } + /*** Review Comment: ```suggestion /** ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996892 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + HASHING_METADATA_FILE_SUFFIX); + try { +if (table.getMetaClient().getFs().exists(metadataFilePath)) { + createCommitMarker(table, metadataFilePath, metadataPartitionPath); +} + } catch (IOException e) { +throw new HoodieIOException("exception while committing hashing metadata for path " + metadataFilePath, e); + } +}); + } + + /*** + * Create commit marker corresponding to hashing metadata file after post commit clustering operation + * @param table + * @param fileStatus + * @param partitionPath + * @throws IOException + */ + private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path fullPath = new Path(partitionPath, getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); +if (fs.exists(fullPath)) { + return; +} +String metadata = ""; +FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes())); + } + + /*** + * Load consistent hashing metadata from given file + * @param table + * @param metaFile + * @return + */ + private static Option loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) { +try { + if (metaFile == null) { +return Option.empty(); + } + byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath())); + return Option.of(HoodieConsistentHashingMetadata.fromBytes(content)); +} catch (FileNotFoundException e) { + return Option.empty(); +} catch (IOException e) { + LOG.error("Error when loading hashing metadata, for path: " + metaFile.getPath().getName(), e); + throw new HoodieIndexException("Error while loading hashing metadata", e); +} + } + + /*** + * COMMIT MARKER RECOVERY JOB. + * If particular hashing metadta file doesn't have commit marker then there could be a case where clustering is done but post commit marker + * creation operation failed. In this case this method will check file group id from consistent hashing metadata against storage base file group ids. + * if one of the file group matches then we can conclude that this is the latest metadata file. + * Note : we will end up calling this method if there is no marker file and no replace commit on active timeline, if replace commit is not present on + * active timeline that means old file group id's before clustering operation got cleaned and only new file group id's of current clustering operation + * are present on the disk. + * @param table Review Comment: Adds the comment of all parameters and return. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at:
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996438 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + HASHING_METADATA_FILE_SUFFIX); + try { +if (table.getMetaClient().getFs().exists(metadataFilePath)) { + createCommitMarker(table, metadataFilePath, metadataPartitionPath); +} + } catch (IOException e) { +throw new HoodieIOException("exception while committing hashing metadata for path " + metadataFilePath, e); + } +}); + } + + /*** + * Create commit marker corresponding to hashing metadata file after post commit clustering operation + * @param table + * @param fileStatus + * @param partitionPath + * @throws IOException + */ + private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path fullPath = new Path(partitionPath, getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); +if (fs.exists(fullPath)) { + return; +} +String metadata = ""; +FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes())); + } + + /*** + * Load consistent hashing metadata from given file + * @param table Review Comment: Adds the comment of all parameters and return. ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath,
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996328 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + HASHING_METADATA_FILE_SUFFIX); + try { +if (table.getMetaClient().getFs().exists(metadataFilePath)) { + createCommitMarker(table, metadataFilePath, metadataPartitionPath); +} + } catch (IOException e) { +throw new HoodieIOException("exception while committing hashing metadata for path " + metadataFilePath, e); + } +}); + } + + /*** + * Create commit marker corresponding to hashing metadata file after post commit clustering operation + * @param table + * @param fileStatus + * @param partitionPath + * @throws IOException + */ + private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path fullPath = new Path(partitionPath, getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); +if (fs.exists(fullPath)) { + return; +} +String metadata = ""; +FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes())); + } + + /*** + * Load consistent hashing metadata from given file Review Comment: ```suggestion * Loads consistent hashing metadata of table from the given meta file. ``` ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath =
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185995971 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + HASHING_METADATA_FILE_SUFFIX); + try { +if (table.getMetaClient().getFs().exists(metadataFilePath)) { + createCommitMarker(table, metadataFilePath, metadataPartitionPath); +} + } catch (IOException e) { +throw new HoodieIOException("exception while committing hashing metadata for path " + metadataFilePath, e); + } +}); + } + + /*** + * Create commit marker corresponding to hashing metadata file after post commit clustering operation + * @param table + * @param fileStatus + * @param partitionPath + * @throws IOException + */ + private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path fullPath = new Path(partitionPath, getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); +if (fs.exists(fullPath)) { + return; +} +String metadata = ""; +FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes())); Review Comment: ```suggestion FileIOUtils.createFileInPath(fs, fullPath, Option.of(StringUtils.EMPTY_STRING.getBytes())); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185995107 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + HASHING_METADATA_FILE_SUFFIX); + try { +if (table.getMetaClient().getFs().exists(metadataFilePath)) { + createCommitMarker(table, metadataFilePath, metadataPartitionPath); +} + } catch (IOException e) { +throw new HoodieIOException("exception while committing hashing metadata for path " + metadataFilePath, e); + } +}); + } + + /*** + * Create commit marker corresponding to hashing metadata file after post commit clustering operation + * @param table Review Comment: Adds the comment of all parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185994632 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -271,8 +327,114 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Create commit marker -> hoodieinstant.commit in metadata folder, consistent hashing metadata reader will use it to + * identify latest commited file which will have updated commit metadata + * @param table + * @param hoodieInstant + */ + public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { +Option> instantPlanPair = +ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant)); +if (!instantPlanPair.isPresent()) { + return; +} +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + Path metadataPartitionPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + HASHING_METADATA_FILE_SUFFIX); + try { +if (table.getMetaClient().getFs().exists(metadataFilePath)) { + createCommitMarker(table, metadataFilePath, metadataPartitionPath); +} + } catch (IOException e) { +throw new HoodieIOException("exception while committing hashing metadata for path " + metadataFilePath, e); + } +}); + } + + /*** + * Create commit marker corresponding to hashing metadata file after post commit clustering operation Review Comment: ```suggestion * Creates commit marker corresponding to hashing metadata file after post commit clustering operation. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185994125 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java: ## @@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType operationType) { public void close() { } + /*** + * Update index metadata + * @param table + * @param hoodieInstant Review Comment: ```suggestion * @param hoodieInstant The instant to commit. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185993420 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java: ## @@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType operationType) { public void close() { } + /*** + * Update index metadata Review Comment: ```suggestion * Updates index metadata of the given table and instant if needed. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185993910 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java: ## @@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType operationType) { public void close() { } + /*** + * Update index metadata + * @param table Review Comment: ```suggestion * @param table The committed table. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185993420 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java: ## @@ -154,6 +154,14 @@ public boolean requiresTagging(WriteOperationType operationType) { public void close() { } + /*** + * Update index metadata Review Comment: ```suggestion * Update index metadata of the given table and instant if needed. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1185992732 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -513,7 +513,6 @@ private Stream getInstantsToArchive() throws IOException { if (config.isMetaserverEnabled()) { return Stream.empty(); } - Review Comment: No any update. Please revert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1180044369 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** + * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata + */ +if (hoodieOldestInstantToArchive != null) { + table.getIndex().updateMetadata(table, Option.of(hoodieOldestInstantToArchive)); Review Comment: @rohan-uptycs, make senses to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1179114163 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** + * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata + */ +if (hoodieOldestInstantToArchive != null) { + table.getIndex().updateMetadata(table, Option.of(hoodieOldestInstantToArchive)); Review Comment: @rohan-uptycs, when `postCommit` executes successfully, `updateMetadata` could invoke. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1178579855 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** + * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata + */ +if (hoodieOldestInstantToArchive != null) { + table.getIndex().updateMetadata(table, Option.of(hoodieOldestInstantToArchive)); Review Comment: @rohan-uptycs, make sense to me from interface point of view. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1177521748 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** + * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata + */ +if (hoodieOldestInstantToArchive != null) { + table.getIndex().updateMetadata(table, Option.of(hoodieOldestInstantToArchive)); Review Comment: @rohan-uptycs, `getInstantsToArchive` is also used to get instants to archive, therefore the update behavior couldn't invoke in this method for design. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1177385692 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** Review Comment: Removes line 515 and 517. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1177385296 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** + * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata + */ +if (hoodieOldestInstantToArchive != null) { + table.getIndex().updateMetadata(table, Option.of(hoodieOldestInstantToArchive)); Review Comment: I don't think invoking `updateMetadata` in `getInstantsToArchive` make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1177384033 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -509,7 +509,15 @@ private Stream getCommitInstantsToArchive() throws IOException { } private Stream getInstantsToArchive() throws IOException { -Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); +List commitInstantsToArchive = getCommitInstantsToArchive().collect(Collectors.toList()); +Stream instants = Stream.concat(getCleanInstantsToArchive(), commitInstantsToArchive.stream()); +HoodieInstant hoodieOldestInstantToArchive = commitInstantsToArchive.stream().max(Comparator.comparing(maxInstant -> maxInstant.getTimestamp())).orElse(null); +/** + * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata Review Comment: ```suggestion // if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175964368 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java: ## @@ -441,6 +441,8 @@ private Stream getCommitInstantsToArchive() throws IOException { Option oldestInstantToRetainForClustering = ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + table.getIndex().updateMetadata(table); + Review Comment: @rohan-uptycs, could the `updateMetadata` be invoked after archive or the caller of `getCommitInstantsToArchive`? IMO, `getCommitInstantsToArchive` only gets the commit instants to archive, no any update behavior. Therefore, the `updateMetadata` should not trigger in `getCommitInstantsToArchive`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175229521 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +279,65 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +Option hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); +// Update metadata for replace commit which are going to get archived. +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant -> +hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, replaceInstantToKeep.getTimestamp())).orElse(true)); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = + ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp())); + if (instantPlanPair.isPresent()) { +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + if (!partitionVisiteddMap.containsKey(partition)) { +Option hoodieConsistentHashingMetadataOption = loadMetadata(table, partition); +if (hoodieConsistentHashingMetadataOption.isPresent()) { + try { +overWriteMetadata(table, hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + HASHING_METADATA_FILE_SUFFIX); + } catch (IOException e) { +throw new RuntimeException(e); + } +} +partitionVisiteddMap.put(partition, Boolean.TRUE); + } +}); + } +}); + } + + private Option getOldestInstantToRetain(HoodieTable table) { +try { + Option oldestInstantToRetainForClustering = + ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + return oldestInstantToRetainForClustering; +} catch (IOException e) { + LOG.error("Error while getting oldest instant to retain info: ", e); + return Option.empty(); +} + } + + private boolean overwriteMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, String fileName) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath()); +Path fullPath = new Path(dir, fileName); +try (FSDataOutputStream fsOut = fs.create(fullPath, true)) { + byte[] bytes = metadata.toBytes(); + fsOut.write(bytes); +} +byte[] bytes = metadata.toBytes(); Review Comment: You need remove line 338~340. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175224217 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +279,62 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +Option hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); +// Update metadata for replace commit which are going to get archived. +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant -> +hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, replaceInstantToKeep.getTimestamp())).orElse(true)); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = + ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp())); + if (instantPlanPair.isPresent()) { +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + if (!partitionVisiteddMap.containsKey(partition)) { +Option hoodieConsistentHashingMetadataOption = loadMetadata(table, partition); +if (hoodieConsistentHashingMetadataOption.isPresent()) { + try { +overWriteMetadata(table, hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + HASHING_METADATA_FILE_SUFFIX); + } catch (IOException e) { +throw new RuntimeException(e); + } +} +partitionVisiteddMap.put(partition, Boolean.TRUE); + } +}); + } +}); + } + + private Option getOldestInstantToRetain(HoodieTable table) { +try { + Option oldestInstantToRetainForClustering = + ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + return oldestInstantToRetainForClustering; +} catch (IOException e) { + LOG.error("Error while getting oldest instant to retain info: ", e); + return Option.empty(); +} + } + + private boolean overWriteMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, String fileName) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath()); +Path fullPath = new Path(dir, fileName); +FSDataOutputStream fsOut = fs.create(fullPath, true); +byte[] bytes = metadata.toBytes(); +fsOut.write(bytes); +fsOut.close(); Review Comment: After above suggestion, `fsOut.close()` would invoke in finally block dynamic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175223280 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +279,62 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +Option hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); +// Update metadata for replace commit which are going to get archived. +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant -> +hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, replaceInstantToKeep.getTimestamp())).orElse(true)); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = + ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp())); + if (instantPlanPair.isPresent()) { +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + if (!partitionVisiteddMap.containsKey(partition)) { +Option hoodieConsistentHashingMetadataOption = loadMetadata(table, partition); +if (hoodieConsistentHashingMetadataOption.isPresent()) { + try { +overWriteMetadata(table, hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + HASHING_METADATA_FILE_SUFFIX); + } catch (IOException e) { +throw new RuntimeException(e); + } +} +partitionVisiteddMap.put(partition, Boolean.TRUE); + } +}); + } +}); + } + + private Option getOldestInstantToRetain(HoodieTable table) { +try { + Option oldestInstantToRetainForClustering = + ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + return oldestInstantToRetainForClustering; +} catch (IOException e) { + LOG.error("Error while getting oldest instant to retain info: ", e); + return Option.empty(); +} + } + + private boolean overWriteMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, String fileName) throws IOException { +HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); +Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath()); +Path fullPath = new Path(dir, fileName); +FSDataOutputStream fsOut = fs.create(fullPath, true); Review Comment: ```suggestion try (FSDataOutputStream fsOut = fs.create(fullPath, true)) { byte[] bytes = metadata.toBytes(); fsOut.write(bytes); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175220178 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +279,62 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +Option hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); +// Update metadata for replace commit which are going to get archived. +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant -> +hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, replaceInstantToKeep.getTimestamp())).orElse(true)); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = + ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp())); + if (instantPlanPair.isPresent()) { +HoodieClusteringPlan plan = instantPlanPair.get().getRight(); +List> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); +partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + if (!partitionVisiteddMap.containsKey(partition)) { +Option hoodieConsistentHashingMetadataOption = loadMetadata(table, partition); +if (hoodieConsistentHashingMetadataOption.isPresent()) { + try { +overWriteMetadata(table, hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + HASHING_METADATA_FILE_SUFFIX); + } catch (IOException e) { +throw new RuntimeException(e); + } +} +partitionVisiteddMap.put(partition, Boolean.TRUE); + } +}); + } +}); + } + + private Option getOldestInstantToRetain(HoodieTable table) { +try { + Option oldestInstantToRetainForClustering = + ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + return oldestInstantToRetainForClustering; +} catch (IOException e) { + LOG.error("Error while getting oldest instant to retain info: ", e); + return Option.empty(); +} + } + + private boolean overWriteMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, String fileName) throws IOException { Review Comment: ```suggestion private boolean overwriteMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, String fileName) throws IOException { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175216695 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +279,62 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +Option hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); Review Comment: Could the oldest instant to retain get from interface parameter? Otherwise the oldest instant gets twice and consumes some performance here and will be inconsistent between `hoodieOldestReplaceInstantToKeep ` and `oldestInstantToRetainForClustering` in `getCommitInstantsToArchive` when the timeline changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175216695 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +279,62 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +Option hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); Review Comment: Could the oldest instant to retain get from interface parameter? Otherwise the oldest instant gets twice and consumes some performance here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175030462 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +278,46 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline(); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = Review Comment: @rohan-uptycs, another question is that in `HoodieTimelineArchiver`, the replacecommits after `oldestInstantToRetainForClustering` aren't archived, but the replacecommits before `oldestInstantToRetainForClustering` would be archived. Therefore does all completed replacecommits need to update metadata? Or only unarchived replacecommits need to be updated metadata? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1174954973 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +278,46 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline(); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = Review Comment: @rohan-uptycs, BTW, does the replacecommit in archived timeline need to update metadata? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1174878383 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +278,46 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline(); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = Review Comment: @rohan-uptycs, the `getCompletedReplaceTimeline()` returns the completed replacecommit, which could generate from clustering operation and insert overwrite action. No problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8503: [HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data
SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1174772541 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ## @@ -275,4 +278,46 @@ public Option getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { +Map partitionVisiteddMap = new HashMap<>(); +HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline(); +hoodieTimeline.getInstants().forEach(instant -> { + Option> instantPlanPair = Review Comment: When replacecommit comes from `INSERT OVERWRITE` action, is there any problem here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org