yihua commented on a change in pull request #3991: URL: https://github.com/apache/hudi/pull/3991#discussion_r751818466
########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java ########## @@ -274,6 +274,16 @@ public int countInstants() { return Option.fromJavaOptional(instants.stream().findFirst()); } + @Override + public Option<HoodieInstant> firstInstant(String action, State state) { + for (HoodieInstant instant : instants) { + if (action.equals(instant.getAction()) && state.equals(instant.getState())) { + return Option.of(instant); + } + } Review comment: Good call. Fixed. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java ########## @@ -134,6 +139,25 @@ private int doCompact(JavaSparkContext jsc) throws Exception { String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); + // If no compaction instant is provided by --instant-time, find the earliest scheduled compaction + // instant from the active timeline + if (cfg.compactionInstantTime == null) { Review comment: Fixed. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java ########## @@ -249,4 +263,10 @@ private int handleErrors(HoodieCommitMetadata metadata, String instantTime) { return -1; } + private HoodieTableMetaClient getMetaClient() { + if (metaClient == null) { + metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build(); + } + return metaClient; + } Review comment: Got it. Fixed. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java ########## @@ -190,6 +190,20 @@ private String getSchemaFromLatestInstant() throws Exception { private int doCluster(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { + if (cfg.clusteringInstantTime == null) { + // Instant time is not specified + // Find the earliest scheduled clustering instant for execution + Option<HoodieInstant> firstClusteringInstant = + getMetaClient().getActiveTimeline().firstInstant( + HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieInstant.State.REQUESTED); + if (firstClusteringInstant.isPresent()) { + cfg.clusteringInstantTime = firstClusteringInstant.get().getTimestamp(); + LOG.info("Found the earliest scheduled clustering instant which will be executed: " + + cfg.clusteringInstantTime); + } else { + throw new RuntimeException("There is no scheduled clustering in the table."); Review comment: Sg. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org