nsivabalan commented on code in PR #17930:
URL: https://github.com/apache/hudi/pull/17930#discussion_r2734001440
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java:
##########
@@ -314,4 +318,52 @@ private HoodieActiveTimeline
createMockTimeline(List<HoodieInstant> instants) {
timeline.setInstants(instants);
return timeline;
}
+
+ @Test
+ void
testRunPendingTableServicesOperations_withCompactionFailure_whenFailOnMDTTableServiceFailuresIsTrue()
{
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
+ BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+
+ when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1);
+
+ // Simulate compaction failure
+ RuntimeException compactionException = new RuntimeException("Compaction
failed");
+ doThrow(compactionException).when(writeClient).runAnyPendingCompactions();
+
+ // When shouldFailOnMDTTableServiceFailures is true (default), exception
should propagate
+ assertThrows(RuntimeException.class, () ->
Review Comment:
when the config value is false, can we validate that the exception is
swallowed and the caller does not see the exception
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -882,6 +890,10 @@ public int getRecordPreparationParallelism() {
return getIntOrDefault(RECORD_PREPARATION_PARALLELISM);
}
+ public boolean shouldFailOnMDTTableServiceFailures() {
Review Comment:
this config already resides in `HoodieMetadataConfig`.
So, we could name this
`shouldFailOnTableServiceFailures`
and in general, we try to avoid using "MDT" string in any of method or
variable naming if possible
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -604,6 +604,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
+ "honor the set value for number of tasks. If not, number of write
status's from data "
+ "table writes will be used for metadata table record preparation");
+ public static final ConfigProperty<Boolean>
FAIL_ON_MDT_TABLE_SERVICE_FAILURES = ConfigProperty
+ .key(METADATA_PREFIX + ".fail.on.table.services.failures")
Review Comment:
minor.
we should align the config key and config property
`METADATA_PREFIX + ".fail.on.table.services.failures` -> `METADATA_PREFIX +
".fail.on.table.service.failures`
essentially, "services" -> "service"
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -604,6 +604,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
+ "honor the set value for number of tasks. If not, number of write
status's from data "
+ "table writes will be used for metadata table record preparation");
+ public static final ConfigProperty<Boolean>
FAIL_ON_MDT_TABLE_SERVICE_FAILURES = ConfigProperty
+ .key(METADATA_PREFIX + ".fail.on.table.services.failures")
Review Comment:
oh, lets name the config property
FAIL_ON_TABLE_SERVICE_FAILURES
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -1189,6 +1201,11 @@ public Builder withRepartitionDefaultPartitions(int
defaultPartitions) {
return this;
}
+ public Builder setFailJobOnMDTTableServiceFailures(boolean
failJobOnMDTTableServiceFailures) {
Review Comment:
again, aligning naming is very important. might seem trivial, but it will
matter down the line.
setFailOnTableServiceFailures(boolean failOnTableServiceFailures)
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java:
##########
@@ -314,4 +318,52 @@ private HoodieActiveTimeline
createMockTimeline(List<HoodieInstant> instants) {
timeline.setInstants(instants);
return timeline;
}
+
+ @Test
+ void
testRunPendingTableServicesOperations_withCompactionFailure_whenFailOnMDTTableServiceFailuresIsTrue()
{
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
+ BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+
+ when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1);
+
+ // Simulate compaction failure
+ RuntimeException compactionException = new RuntimeException("Compaction
failed");
+ doThrow(compactionException).when(writeClient).runAnyPendingCompactions();
+
+ // When shouldFailOnMDTTableServiceFailures is true (default), exception
should propagate
+ assertThrows(RuntimeException.class, () ->
+
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+ metaClient, writeClient, true, Option.empty()),
+ "Expected exception to be thrown when
shouldFailOnMDTTableServiceFailures is true");
+
+ verify(writeClient, times(1)).runAnyPendingCompactions();
+ }
+
+ @Test
+ void
testRunPendingTableServicesOperations_withLogCompactionFailure_shouldIncrementMetric()
{
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
+ BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+ HoodieMetadataMetrics metrics = mock(HoodieMetadataMetrics.class);
+
+ when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(0);
+
when(timeline.filterPendingLogCompactionTimeline().countInstants()).thenReturn(1);
+
+ // Simulate log compaction failure
+ HoodieException logCompactionException = new HoodieException("Log
compaction failed");
+
doThrow(logCompactionException).when(writeClient).runAnyPendingLogCompactions();
+
+ // Exception should propagate and metrics should be incremented
+ assertThrows(HoodieException.class, () ->
+
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+ metaClient, writeClient, true, Option.of(metrics)),
+ "Expected exception to be thrown when log compaction fails");
+
+ verify(writeClient, times(1)).runAnyPendingLogCompactions();
+ verify(metrics, times(1)).incrementMetric(
+ HoodieMetadataMetrics.PENDING_COMPACTIONS_FAILURES, 1);
Review Comment:
shouldn't we also validate "logcompaction_failures" is also emitted.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java:
##########
@@ -314,4 +318,52 @@ private HoodieActiveTimeline
createMockTimeline(List<HoodieInstant> instants) {
timeline.setInstants(instants);
return timeline;
}
+
+ @Test
+ void
testRunPendingTableServicesOperations_withCompactionFailure_whenFailOnMDTTableServiceFailuresIsTrue()
{
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
+ BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+
+ when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1);
+
+ // Simulate compaction failure
+ RuntimeException compactionException = new RuntimeException("Compaction
failed");
+ doThrow(compactionException).when(writeClient).runAnyPendingCompactions();
+
+ // When shouldFailOnMDTTableServiceFailures is true (default), exception
should propagate
+ assertThrows(RuntimeException.class, () ->
+
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+ metaClient, writeClient, true, Option.empty()),
+ "Expected exception to be thrown when
shouldFailOnMDTTableServiceFailures is true");
+
+ verify(writeClient, times(1)).runAnyPendingCompactions();
Review Comment:
where are we validating the metrics?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2064,35 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O>
writeClient, Option<Strin
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
- LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
- } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
- LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
- writeClient.compact(compactionInstantTime, true);
- } else if (metadataWriteConfig.isLogCompactionEnabled()) {
- // Schedule and execute log compaction with new instant time.
- Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
- if (scheduledLogCompaction.isPresent()) {
- LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
- writeClient.logCompact(scheduledLogCompaction.get(), true);
+ boolean skipCompactions =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime, true);
}
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing compaction on metadata
table ", e);
+ throw e;
+ }
+
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with new instant time.
+ Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
+ if (scheduledLogCompaction.isPresent()) {
+ LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
+ writeClient.logCompact(scheduledLogCompaction.get(), true);
+ }
+ }
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing logcompaction on metadata
table ", e);
Review Comment:
minor.
"in metadata table"
also, looks like we have double spacing between "on" and "metadata".
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2064,35 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O>
writeClient, Option<Strin
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
- LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
- } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
- LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
- writeClient.compact(compactionInstantTime, true);
- } else if (metadataWriteConfig.isLogCompactionEnabled()) {
- // Schedule and execute log compaction with new instant time.
- Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
- if (scheduledLogCompaction.isPresent()) {
- LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
- writeClient.logCompact(scheduledLogCompaction.get(), true);
+ boolean skipCompactions =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime, true);
}
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing compaction on metadata
table ", e);
+ throw e;
+ }
+
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with new instant time.
+ Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
+ if (scheduledLogCompaction.isPresent()) {
+ LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
+ writeClient.logCompact(scheduledLogCompaction.get(), true);
+ }
+ }
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing logcompaction on metadata
table ", e);
Review Comment:
lets remove the empty space in the end
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]