lanyuanxiaoyao commented on code in PR #5677:
URL: https://github.com/apache/hudi/pull/5677#discussion_r883205902


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java:
##########
@@ -218,74 +228,112 @@ private void compact() throws Exception {
       }
 
       // fetch the instant based on the configured execution sequence
-      HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-      Option<HoodieInstant> requested = 
CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : 
timeline.firstInstant();
-      if (!requested.isPresent()) {
+      HoodieTimeline timeline = table.getActiveTimeline();
+      List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) 
ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
+          .select(timeline.filterPendingCompactionTimeline(), cfg);
+      if (requested.isEmpty()) {
         // do nothing.
         LOG.info("No compaction plan scheduled, turns on the compaction plan 
schedule with --schedule option");
         return;
       }
 
-      String compactionInstantTime = requested.get().getTimestamp();
-
-      HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
-      if (timeline.containsInstant(inflightInstant)) {
-        LOG.info("Rollback inflight compaction instant: [" + 
compactionInstantTime + "]");
-        table.rollbackInflightCompaction(inflightInstant);
-        table.getMetaClient().reloadActiveTimeline();
-      }
+      List<String> compactionInstantTimes = 
requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      compactionInstantTimes.forEach(timestamp -> {
+        HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(timestamp);
+        if (timeline.containsInstant(inflightInstant)) {
+          LOG.info("Rollback inflight compaction instant: [" + timestamp + 
"]");
+          table.rollbackInflightCompaction(inflightInstant);
+          table.getMetaClient().reloadActiveTimeline();
+        }
+      });
 
-      // generate compaction plan
+      // generate timestamp and compaction plan pair
       // should support configurable commit metadata
-      HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
-          table.getMetaClient(), compactionInstantTime);
-
-      if (compactionPlan == null || (compactionPlan.getOperations() == null)
-          || (compactionPlan.getOperations().isEmpty())) {
+      List<Pair<String, HoodieCompactionPlan>> compactionPlans = 
compactionInstantTimes.stream()
+          .map(timestamp -> {
+            try {
+              return Pair.of(timestamp, 
CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
+            } catch (IOException e) {
+              throw new HoodieException(e);
+            }
+          })
+          // reject empty compaction plan
+          .filter(pair -> !(pair.getRight() == null
+              || pair.getRight().getOperations() == null
+              || pair.getRight().getOperations().isEmpty()))
+          .collect(Collectors.toList());
+
+      if (compactionPlans.isEmpty()) {
         // No compaction plan, do nothing and return.
-        LOG.info("No compaction plan for instant " + compactionInstantTime);
+        LOG.info("No compaction plan for instant " + String.join(",", 
compactionInstantTimes));
         return;
       }
 
-      HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+      List<HoodieInstant> instants = 
compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
       HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-      if (!pendingCompactionTimeline.containsInstant(instant)) {
-        // this means that the compaction plan was written to auxiliary 
path(.tmp)
-        // but not the meta path(.hoodie), this usually happens when the job 
crush
-        // exceptionally.
-
-        // clean the compaction plan in auxiliary path and cancels the 
compaction.
-
-        LOG.warn("The compaction plan was fetched through the auxiliary 
path(.tmp) but not the meta path(.hoodie).\n"
-            + "Clean the compaction plan in auxiliary path and cancels the 
compaction");
-        CompactionUtil.cleanInstant(table.getMetaClient(), instant);
-        return;
+      for (HoodieInstant instant : instants) {
+        if (!pendingCompactionTimeline.containsInstant(instant)) {
+          // this means that the compaction plan was written to auxiliary 
path(.tmp)
+          // but not the meta path(.hoodie), this usually happens when the job 
crush
+          // exceptionally.
+          // clean the compaction plan in auxiliary path and cancels the 
compaction.
+          LOG.warn("The compaction plan was fetched through the auxiliary 
path(.tmp) but not the meta path(.hoodie).\n"
+              + "Clean the compaction plan in auxiliary path and cancels the 
compaction");
+          CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+          return;
+        }
       }
 
       // get compactionParallelism.
       int compactionParallelism = 
conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
-          ? compactionPlan.getOperations().size() : 
conf.getInteger(FlinkOptions.COMPACTION_TASKS);
+          ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> 
pair.getRight().getOperations().size()).sum())
+          : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
 
-      LOG.info("Start to compaction for instant " + compactionInstantTime);
+      LOG.info("Start to compaction for instant " + compactionInstantTimes);
 
       // Mark instant as compaction inflight
-      
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+      for (HoodieInstant instant : instants) {
+        
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+      }
       table.getMetaClient().reloadActiveTimeline();
 
-      env.addSource(new CompactionPlanSourceFunction(compactionPlan, 
compactionInstantTime))
-          .name("compaction_source")
-          .uid("uid_compaction_source")
-          .rebalance()
+      // use side-output to make operations that is in the same plan to be 
placed in the same stream
+      // keyby() cannot sure that different operations are in the different 
stream
+      Pair<String, HoodieCompactionPlan> firstPlan = compactionPlans.get(0);
+      DataStream<CompactionPlanEvent> source = env.addSource(new 
CompactionPlanSourceFunction(firstPlan.getRight(), firstPlan.getLeft()))
+          .name("compaction_source " + firstPlan.getLeft())
+          .uid("uid_compaction_source " + firstPlan.getLeft());
+      if (compactionPlans.size() > 1) {
+        for (Pair<String, HoodieCompactionPlan> pair : 
compactionPlans.subList(1, compactionPlans.size())) {
+          source = source.union(env.addSource(new 
CompactionPlanSourceFunction(pair.getRight(), pair.getLeft()))
+              .name("compaction_source " + pair.getLeft())
+              .uid("uid_compaction_source_" + pair.getLeft()));
+        }
+      }
+
+      SingleOutputStreamOperator<Void> operator = source.rebalance()
           .transform("compact_task",
               TypeInformation.of(CompactionCommitEvent.class),
               new ProcessOperator<>(new CompactFunction(conf)))
           .setParallelism(compactionParallelism)
-          .addSink(new CompactionCommitSink(conf))
-          .name("clean_commits")
-          .uid("uid_clean_commits")
+          .process(new ProcessFunction<CompactionCommitEvent, Void>() {
+            @Override
+            public void processElement(CompactionCommitEvent event, 
ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> 
out) {
+              context.output(new OutputTag<>(event.getInstant(), 
TypeInformation.of(CompactionCommitEvent.class)), event);
+            }
+          })
+          .name("group_by_compaction_plan")
+          .uid("uid_group_by_compaction_plan")
           .setParallelism(1);
 
-      env.execute("flink_hudi_compaction_" + compactionInstantTime);
+      compactionPlans.forEach(pair ->
+          operator.getSideOutput(new OutputTag<>(pair.getLeft(), 
TypeInformation.of(CompactionCommitEvent.class)))
+              .addSink(new CompactionCommitSink(conf))

Review Comment:
   Yes, CompactionCommitSink is designed to handle one compaction plan' status. 
CompactionCommitSink would collect every compaction operation' status until 
every operation is complete, and then mark the compaction plan to complete. 
Second, file id that is need to compacted is not allow be scheduled to 
different plan, means that the compaction of one plan would not interfere with 
another plan. So that it is only need to make sure the operation that belong to 
the same plan into the same CompactionCommitSink.
   This is why i use side-output to put operations that belong to the same 
compaction plan into the same sink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to