voonhous commented on code in PR #6566:
URL: https://github.com/apache/hudi/pull/6566#discussion_r962104203


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java:
##########
@@ -215,82 +226,126 @@ protected Pair<CompletableFuture, ExecutorService> 
startService() {
       }, executor), executor);
     }
 
+    /**
+     * Follows the same execution methodology of HoodieFlinkCompactor, where 
only one clustering job
+     * is allowed to be executed at any point in time.
+     * <p>
+     * If there is an inflight clustering job, it will be rolled back and 
re-attempted.
+     * <p>
+     * A clustering plan will be generated if `schedule` is true.
+     *
+     * @throws Exception
+     * @see HoodieFlinkCompactor
+     */
     private void cluster() throws Exception {
       table.getMetaClient().reloadActiveTimeline();
 
-      // judges whether there are operations
-      // to compute the clustering instant time and exec clustering.
       if (cfg.schedule) {
+        // create a clustering plan on the timeline
         ClusteringUtil.validateClusteringScheduling(conf);
-        String clusteringInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
-        boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+        String clusteringInstantTime = cfg.clusteringInstantTime != null ? 
cfg.clusteringInstantTime
+            : HoodieActiveTimeline.createNewInstantTime();
+
+        LOG.info("Creating a clustering plan for instant [" + 
clusteringInstantTime + "]");
+        boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime,
+            Option.empty());
         if (!scheduled) {
           // do nothing.
           LOG.info("No clustering plan for this job");
+          executeDummyPipeline();
           return;
         }
         table.getMetaClient().reloadActiveTimeline();
       }
 
       // fetch the instant based on the configured execution sequence
-      List<HoodieInstant> instants = 
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream()
-          .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED).collect(Collectors.toList());
+      List<HoodieInstant> instants = 
ClusteringUtils.getPendingClusteringInstantTimes(
+          table.getMetaClient());
       if (instants.isEmpty()) {
         // do nothing.
-        LOG.info("No clustering plan scheduled, turns on the clustering plan 
schedule with --schedule option");
+        LOG.info(
+            "No clustering plan scheduled, turns on the clustering plan 
schedule with --schedule option");
+        executeDummyPipeline();
         return;
       }
 
-      HoodieInstant clusteringInstant = 
CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : 
instants.get(0);
+      HoodieInstant reqClusteringInstant;
+      if (cfg.clusteringInstantTime != null) {
+        List<HoodieInstant> reqHoodieInstant = instants
+            .stream()
+            .filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime))

Review Comment:
   Using this instead:
   
   ```java
   reqClusteringInstant = instants.stream()
               .filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime))
               .findFirst()
               .orElseThrow(() -> new HoodieException("Clustering instant [" + 
cfg.clusteringInstantTime + "] not found"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to