SteNicholas commented on code in PR #8503: URL: https://github.com/apache/hudi/pull/8503#discussion_r1175223280
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java: ########## @@ -275,4 +279,62 @@ public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) { throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /** + * Update default metadata file(00000000000000.hashing_meta) with the latest committed metadata file so that default file will be in sync + * with latest commit. + * + * @param table + */ + public void updateMetadata(HoodieTable table) { + Map<String, Boolean> partitionVisiteddMap = new HashMap<>(); + Option<HoodieInstant> hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); + // Update metadata for replace commit which are going to get archived. + HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline().filter(instant -> + hoodieOldestReplaceInstantToKeep.map(replaceInstantToKeep -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, replaceInstantToKeep.getTimestamp())).orElse(true)); + hoodieTimeline.getInstants().forEach(instant -> { + Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair = + ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instant.getTimestamp())); + if (instantPlanPair.isPresent()) { + HoodieClusteringPlan plan = instantPlanPair.get().getRight(); + List<Map<String, String>> partitionMapList = plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList()); + partitionMapList.stream().forEach(partitionMap -> { + String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); + if (!partitionVisiteddMap.containsKey(partition)) { + Option<HoodieConsistentHashingMetadata> hoodieConsistentHashingMetadataOption = loadMetadata(table, partition); + if (hoodieConsistentHashingMetadataOption.isPresent()) { + try { + overWriteMetadata(table, hoodieConsistentHashingMetadataOption.get(), HoodieTimeline.INIT_INSTANT_TS + HASHING_METADATA_FILE_SUFFIX); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + partitionVisiteddMap.put(partition, Boolean.TRUE); + } + }); + } + }); + } + + private Option<HoodieInstant> getOldestInstantToRetain(HoodieTable table) { + try { + Option<HoodieInstant> oldestInstantToRetainForClustering = + ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + return oldestInstantToRetainForClustering; + } catch (IOException e) { + LOG.error("Error while getting oldest instant to retain info: ", e); + return Option.empty(); + } + } + + private boolean overWriteMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, String fileName) throws IOException { + HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); + Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath()); + Path fullPath = new Path(dir, fileName); + FSDataOutputStream fsOut = fs.create(fullPath, true); Review Comment: ```suggestion try (FSDataOutputStream fsOut = fs.create(fullPath, true)) { byte[] bytes = metadata.toBytes(); fsOut.write(bytes); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org