[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-08-01 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1281414915


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
   cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
   LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
   + cfg.compactionInstantTime);
-} else {
-  LOG.info("There is no scheduled compaction in the table.");
-  return 0;
+}
+
+// update cfg.compactionInstantTime if finding an expired instant
+List inflightInstants = metaClient.getActiveTimeline()
+.filterPendingCompactionTimeline().filterInflights().getInstants();
+List expiredInstants = 
inflightInstants.stream().filter(instant -> {
+  try {
+return 
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+  } catch (IOException io) {
+LOG.info("Failed to check heartbeat for instant " + instant);
+  }
+  return false;
+}).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+if (!expiredInstants.isEmpty()) {
+  cfg.compactionInstantTime = expiredInstants.get(0);
+  LOG.info("Found expired compaction instant, update the earliest 
scheduled compaction instant which will be executed: "
+  + cfg.compactionInstantTime);
 }
   }
+
+  // do nothing if cfg.compactionInstantTime still is null
+  if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+LOG.info("There is no scheduled compaction in the table.");
+return 0;
+  }
+
+  // start a heartbeat for the instant
+  client.getHeartbeatClient().start(cfg.compactionInstantTime);
+

Review Comment:
   Did you test it manually offline for multiple compactors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-31 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1278975610


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
   cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
   LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
   + cfg.compactionInstantTime);
-} else {
-  LOG.info("There is no scheduled compaction in the table.");
-  return 0;
+}
+
+// update cfg.compactionInstantTime if finding an expired instant
+List inflightInstants = metaClient.getActiveTimeline()
+.filterPendingCompactionTimeline().filterInflights().getInstants();
+List expiredInstants = 
inflightInstants.stream().filter(instant -> {
+  try {
+return 
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+  } catch (IOException io) {
+LOG.info("Failed to check heartbeat for instant " + instant);
+  }
+  return false;
+}).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+if (!expiredInstants.isEmpty()) {
+  cfg.compactionInstantTime = expiredInstants.get(0);
+  LOG.info("Found expired compaction instant, update the earliest 
scheduled compaction instant which will be executed: "
+  + cfg.compactionInstantTime);
 }
   }
+
+  // do nothing if cfg.compactionInstantTime still is null
+  if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+LOG.info("There is no scheduled compaction in the table.");
+return 0;
+  }
+
+  // start a heartbeat for the instant
+  client.getHeartbeatClient().start(cfg.compactionInstantTime);
+

Review Comment:
   I mean the original try finally block close the write client after the whole 
compaction/clean finishes, the new try and close seem does the same thing ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-30 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1278770289


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
   cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
   LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
   + cfg.compactionInstantTime);
-} else {
-  LOG.info("There is no scheduled compaction in the table.");
-  return 0;
+}
+
+// update cfg.compactionInstantTime if finding an expired instant
+List inflightInstants = metaClient.getActiveTimeline()
+.filterPendingCompactionTimeline().filterInflights().getInstants();
+List expiredInstants = 
inflightInstants.stream().filter(instant -> {
+  try {
+return 
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+  } catch (IOException io) {
+LOG.info("Failed to check heartbeat for instant " + instant);
+  }
+  return false;
+}).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+if (!expiredInstants.isEmpty()) {
+  cfg.compactionInstantTime = expiredInstants.get(0);
+  LOG.info("Found expired compaction instant, update the earliest 
scheduled compaction instant which will be executed: "
+  + cfg.compactionInstantTime);
 }
   }
+
+  // do nothing if cfg.compactionInstantTime still is null
+  if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+LOG.info("There is no scheduled compaction in the table.");
+return 0;
+  }
+
+  // start a heartbeat for the instant
+  client.getHeartbeatClient().start(cfg.compactionInstantTime);
+

Review Comment:
   From the code, I didn't see the difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-29 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1278482740


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
   cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
   LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
   + cfg.compactionInstantTime);
-} else {
-  LOG.info("There is no scheduled compaction in the table.");
-  return 0;
+}
+
+// update cfg.compactionInstantTime if finding an expired instant
+List inflightInstants = metaClient.getActiveTimeline()
+.filterPendingCompactionTimeline().filterInflights().getInstants();
+List expiredInstants = 
inflightInstants.stream().filter(instant -> {
+  try {
+return 
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+  } catch (IOException io) {
+LOG.info("Failed to check heartbeat for instant " + instant);
+  }
+  return false;
+}).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+if (!expiredInstants.isEmpty()) {
+  cfg.compactionInstantTime = expiredInstants.get(0);
+  LOG.info("Found expired compaction instant, update the earliest 
scheduled compaction instant which will be executed: "
+  + cfg.compactionInstantTime);
 }
   }
+
+  // do nothing if cfg.compactionInstantTime still is null
+  if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+LOG.info("There is no scheduled compaction in the table.");
+return 0;
+  }
+
+  // start a heartbeat for the instant
+  client.getHeartbeatClient().start(cfg.compactionInstantTime);
+

Review Comment:
   The try finally caluse would close the client automically ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-28 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1277329137


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
   cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
   LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
   + cfg.compactionInstantTime);
-} else {
-  LOG.info("There is no scheduled compaction in the table.");
-  return 0;
+}
+
+// update cfg.compactionInstantTime if finding an expired instant
+List inflightInstants = metaClient.getActiveTimeline()
+.filterPendingCompactionTimeline().filterInflights().getInstants();
+List expiredInstants = 
inflightInstants.stream().filter(instant -> {
+  try {
+return 
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+  } catch (IOException io) {
+LOG.info("Failed to check heartbeat for instant " + instant);
+  }
+  return false;
+}).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+if (!expiredInstants.isEmpty()) {
+  cfg.compactionInstantTime = expiredInstants.get(0);
+  LOG.info("Found expired compaction instant, update the earliest 
scheduled compaction instant which will be executed: "
+  + cfg.compactionInstantTime);
 }
   }
+
+  // do nothing if cfg.compactionInstantTime still is null
+  if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+LOG.info("There is no scheduled compaction in the table.");
+return 0;
+  }
+
+  // start a heartbeat for the instant
+  client.getHeartbeatClient().start(cfg.compactionInstantTime);
+

Review Comment:
   The heartbeat was previously used for lazy cleaning for multi-writers:
   
   ```java
   if (config.getFailedWritesCleanPolicy().isLazy()) {
 this.heartbeatClient.start(instantTime);
   }
   ```
   
   Now it is used for table service rollback, makes sense to me, we just need 
to assure the heartbeat client is working during the whole lifecycle of the 
compaction job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-26 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1274711375


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -101,6 +104,12 @@ public static class Config implements Serializable {
 public String runningMode = null;
 @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", 
required = false)
 public String strategyClassName = 
LogFileSizeBasedCompactionStrategy.class.getName();
+@Parameter(names = {"--job-max-processing-time-ms", "-mt"}, description = 
"Take effect when using --mode/-m execute or scheduleAndExecute. "
++ "If maxProcessingTimeMs passed but compaction job is still 
unfinished, hoodie would consider this job as failed and relaunch.")
+public long maxProcessingTimeMs = 0;
+@Parameter(names = {"--retry-last-failed-compaction-job", "-rc"}, 
description = "Take effect when using --mode/-m execute or scheduleAndExecute. "

Review Comment:
   You can actually check the heartbeat of the instant, if the heartbeat timed 
out, we can roll it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-25 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1274291759


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -101,6 +104,12 @@ public static class Config implements Serializable {
 public String runningMode = null;
 @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", 
required = false)
 public String strategyClassName = 
LogFileSizeBasedCompactionStrategy.class.getName();
+@Parameter(names = {"--job-max-processing-time-ms", "-mt"}, description = 
"Take effect when using --mode/-m execute or scheduleAndExecute. "
++ "If maxProcessingTimeMs passed but compaction job is still 
unfinished, hoodie would consider this job as failed and relaunch.")
+public long maxProcessingTimeMs = 0;
+@Parameter(names = {"--retry-last-failed-compaction-job", "-rc"}, 
description = "Take effect when using --mode/-m execute or scheduleAndExecute. "

Review Comment:
   > the failed inflight compaction plan which will never been re-run
   
   Can we fix that rollback by including the inflight compactions instead of 
introducing new config options?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-24 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1273077993


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -101,6 +104,12 @@ public static class Config implements Serializable {
 public String runningMode = null;
 @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", 
required = false)
 public String strategyClassName = 
LogFileSizeBasedCompactionStrategy.class.getName();
+@Parameter(names = {"--job-max-processing-time-ms", "-mt"}, description = 
"Take effect when using --mode/-m execute or scheduleAndExecute. "
++ "If maxProcessingTimeMs passed but compaction job is still 
unfinished, hoodie would consider this job as failed and relaunch.")
+public long maxProcessingTimeMs = 0;
+@Parameter(names = {"--retry-last-failed-compaction-job", "-rc"}, 
description = "Take effect when using --mode/-m execute or scheduleAndExecute. "

Review Comment:
   Got it, so do you still need this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-24 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1272922937


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -101,6 +104,12 @@ public static class Config implements Serializable {
 public String runningMode = null;
 @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", 
required = false)
 public String strategyClassName = 
LogFileSizeBasedCompactionStrategy.class.getName();
+@Parameter(names = {"--job-max-processing-time-ms", "-mt"}, description = 
"Take effect when using --mode/-m execute or scheduleAndExecute. "
++ "If maxProcessingTimeMs passed but compaction job is still 
unfinished, hoodie would consider this job as failed and relaunch.")
+public long maxProcessingTimeMs = 0;
+@Parameter(names = {"--retry-last-failed-compaction-job", "-rc"}, 
description = "Take effect when using --mode/-m execute or scheduleAndExecute. "

Review Comment:
   So the failed compaction/clustering would finally got rolled back by the 
subsequent executions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9229: [HUDI-6565] Spark offline compaction add failed retry mechanism

2023-07-19 Thread via GitHub


danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1268974007


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -101,6 +104,12 @@ public static class Config implements Serializable {
 public String runningMode = null;
 @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", 
required = false)
 public String strategyClassName = 
LogFileSizeBasedCompactionStrategy.class.getName();
+@Parameter(names = {"--job-max-processing-time-ms", "-mt"}, description = 
"Take effect when using --mode/-m execute or scheduleAndExecute. "
++ "If maxProcessingTimeMs passed but compaction job is still 
unfinished, hoodie would consider this job as failed and relaunch.")
+public long maxProcessingTimeMs = 0;
+@Parameter(names = {"--retry-last-failed-compaction-job", "-rc"}, 
description = "Take effect when using --mode/-m execute or scheduleAndExecute. "

Review Comment:
   Another choice is to always rollback the previous pending compaction before 
starting a new one, we only need to assure that the compaction can be finished 
between the compaction job scheduling interval.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org