codope commented on a change in pull request #3991:
URL: https://github.com/apache/hudi/pull/3991#discussion_r751804550



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
##########
@@ -249,4 +263,10 @@ private int handleErrors(HoodieCommitMetadata metadata, 
String instantTime) {
     return -1;
   }
 
+  private HoodieTableMetaClient getMetaClient() {
+    if (metaClient == null) {
+      metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
+    }
+    return metaClient;
+  }

Review comment:
       Consider moving this to `UtilHelpers`. We can reuse in `HoodieCompactor` 
as well.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
##########
@@ -190,6 +190,20 @@ private String getSchemaFromLatestInstant() throws 
Exception {
   private int doCluster(JavaSparkContext jsc) throws Exception {
     String schemaStr = getSchemaFromLatestInstant();
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
+      if (cfg.clusteringInstantTime == null) {
+        // Instant time is not specified
+        // Find the earliest scheduled clustering instant for execution
+        Option<HoodieInstant> firstClusteringInstant =
+            getMetaClient().getActiveTimeline().firstInstant(
+                HoodieTimeline.REPLACE_COMMIT_ACTION, 
HoodieInstant.State.REQUESTED);
+        if (firstClusteringInstant.isPresent()) {
+          cfg.clusteringInstantTime = 
firstClusteringInstant.get().getTimestamp();
+          LOG.info("Found the earliest scheduled clustering instant which will 
be executed: "
+              + cfg.clusteringInstantTime);
+        } else {
+          throw new RuntimeException("There is no scheduled clustering in the 
table.");

Review comment:
       How about using the custom 
`HoodieClusteringException/HoodieCompactionException`?

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
##########
@@ -134,6 +139,25 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
     String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
     SparkRDDWriteClient<HoodieRecordPayload> client =
         UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, 
cfg.parallelism, Option.empty(), props);
+    // If no compaction instant is provided by --instant-time, find the 
earliest scheduled compaction
+    // instant from the active timeline
+    if (cfg.compactionInstantTime == null) {

Review comment:
       let's use `StringUtils.isNullOrEmpty`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -274,6 +274,16 @@ public int countInstants() {
     return Option.fromJavaOptional(instants.stream().findFirst());
   }
 
+  @Override
+  public Option<HoodieInstant> firstInstant(String action, State state) {
+    for (HoodieInstant instant : instants) {
+      if (action.equals(instant.getAction()) && 
state.equals(instant.getState())) {
+        return Option.of(instant);
+      }
+    }

Review comment:
       Consider using stream and filter to be in line with other APIs in the 
timeline. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to