yihua commented on a change in pull request #3991:
URL: https://github.com/apache/hudi/pull/3991#discussion_r751818466



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -274,6 +274,16 @@ public int countInstants() {
     return Option.fromJavaOptional(instants.stream().findFirst());
   }
 
+  @Override
+  public Option<HoodieInstant> firstInstant(String action, State state) {
+    for (HoodieInstant instant : instants) {
+      if (action.equals(instant.getAction()) && 
state.equals(instant.getState())) {
+        return Option.of(instant);
+      }
+    }

Review comment:
       Good call.  Fixed.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
##########
@@ -134,6 +139,25 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
     String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
     SparkRDDWriteClient<HoodieRecordPayload> client =
         UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, 
cfg.parallelism, Option.empty(), props);
+    // If no compaction instant is provided by --instant-time, find the 
earliest scheduled compaction
+    // instant from the active timeline
+    if (cfg.compactionInstantTime == null) {

Review comment:
       Fixed.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
##########
@@ -249,4 +263,10 @@ private int handleErrors(HoodieCommitMetadata metadata, 
String instantTime) {
     return -1;
   }
 
+  private HoodieTableMetaClient getMetaClient() {
+    if (metaClient == null) {
+      metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
+    }
+    return metaClient;
+  }

Review comment:
       Got it.  Fixed.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
##########
@@ -190,6 +190,20 @@ private String getSchemaFromLatestInstant() throws 
Exception {
   private int doCluster(JavaSparkContext jsc) throws Exception {
     String schemaStr = getSchemaFromLatestInstant();
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
+      if (cfg.clusteringInstantTime == null) {
+        // Instant time is not specified
+        // Find the earliest scheduled clustering instant for execution
+        Option<HoodieInstant> firstClusteringInstant =
+            getMetaClient().getActiveTimeline().firstInstant(
+                HoodieTimeline.REPLACE_COMMIT_ACTION, 
HoodieInstant.State.REQUESTED);
+        if (firstClusteringInstant.isPresent()) {
+          cfg.clusteringInstantTime = 
firstClusteringInstant.get().getTimestamp();
+          LOG.info("Found the earliest scheduled clustering instant which will 
be executed: "
+              + cfg.clusteringInstantTime);
+        } else {
+          throw new RuntimeException("There is no scheduled clustering in the 
table.");

Review comment:
       Sg.  Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to