suryaprasanna commented on code in PR #17930:
URL: https://github.com/apache/hudi/pull/17930#discussion_r2729694972
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2012,19 +2015,25 @@ public void performTableServices(Option<String>
inFlightInstantTimestamp, boolea
static HoodieActiveTimeline
runPendingTableServicesOperationsAndRefreshTimeline(HoodieTableMetaClient
metadataMetaClient,
BaseHoodieWriteClient<?, ?, ?, ?> writeClient,
-
boolean initialTimelineRequiresRefresh) {
- HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ?
metadataMetaClient.reloadActiveTimeline() :
metadataMetaClient.getActiveTimeline();
- // finish off any pending log compaction or compactions operations if any
from previous attempt.
- boolean ranServices = false;
- if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) {
- writeClient.runAnyPendingCompactions();
- ranServices = true;
- }
- if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() >
0) {
- writeClient.runAnyPendingLogCompactions();
- ranServices = true;
+
boolean initialTimelineRequiresRefresh,
+
Option<HoodieMetadataMetrics> metricsOption) {
+ try {
+ HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ?
metadataMetaClient.reloadActiveTimeline() :
metadataMetaClient.getActiveTimeline();
+ // finish off any pending log compaction or compactions operations if
any from previous attempt.
+ boolean ranServices = false;
+ if (activeTimeline.filterPendingCompactionTimeline().countInstants() >
0) {
+ writeClient.runAnyPendingCompactions();
+ ranServices = true;
+ }
+ if (activeTimeline.filterPendingLogCompactionTimeline().countInstants()
> 0) {
+ writeClient.runAnyPendingLogCompactions();
+ ranServices = true;
+ }
+ return ranServices ? metadataMetaClient.reloadActiveTimeline() :
activeTimeline;
+ } finally {
+ metricsOption.ifPresent(m -> m.incrementMetric(
Review Comment:
Good catch, make the necessary changes
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2063,33 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O>
writeClient, Option<Strin
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
- LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
- } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
- LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
- writeClient.compact(compactionInstantTime, true);
- } else if (metadataWriteConfig.isLogCompactionEnabled()) {
- // Schedule and execute log compaction with new instant time.
- Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
- if (scheduledLogCompaction.isPresent()) {
- LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
- writeClient.logCompact(scheduledLogCompaction.get(), true);
+ boolean skipCompactions =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime, true);
}
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing compaction ", e);
Review Comment:
Made the necessary changes.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2063,33 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O>
writeClient, Option<Strin
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
- LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
- } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
- LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
- writeClient.compact(compactionInstantTime, true);
- } else if (metadataWriteConfig.isLogCompactionEnabled()) {
- // Schedule and execute log compaction with new instant time.
- Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
- if (scheduledLogCompaction.isPresent()) {
- LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
- writeClient.logCompact(scheduledLogCompaction.get(), true);
+ boolean skipCompactions =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime, true);
}
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing compaction ", e);
+ }
+
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with new instant time.
+ Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
+ if (scheduledLogCompaction.isPresent()) {
+ LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
+ writeClient.logCompact(scheduledLogCompaction.get(), true);
+ }
+ }
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing logcompaction ", e);
Review Comment:
Made the necessary changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]