[ https://issues.apache.org/jira/browse/HUDI-4736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-4736: ----------------------------- Sprint: 2022/08/22, 2022/09/05 (was: 2022/08/22) > Fix inflight clean action preventing clean service to continue when multiple > cleans are not allowed > --------------------------------------------------------------------------------------------------- > > Key: HUDI-4736 > URL: https://issues.apache.org/jira/browse/HUDI-4736 > Project: Apache Hudi > Issue Type: Bug > Components: cleaning > Affects Versions: 0.11.0, 0.11.1 > Reporter: Ethan Guo > Assignee: Ethan Guo > Priority: Blocker > Labels: pull-request-available > Fix For: 0.12.1 > > > For Hudi Deltastreamer async cleaning, when the Spark job fails in the middle > of the cleaning, leaving the clean instant inflight, the Spark job retried > next time may not resume the inflight clean action if > `hoodie.clean.allow.multiple` is `false`, i.e., multiple clean schedules are > disabled. This is due to a bug in the code below. > > Relevant logic in BaseHoodieWriteClient: > {code:java} > public HoodieCleanMetadata clean(String cleanInstantTime, boolean > scheduleInline, boolean skipLocking) throws HoodieIOException { > if (!tableServicesEnabled(config)) { > return null; > } > final Timer.Context timerContext = metrics.getCleanCtx(); > CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), > HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); > HoodieCleanMetadata metadata = null; > HoodieTable table = createTable(config, hadoopConf); > if (config.allowMultipleCleans() || > !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) > { > LOG.info("Cleaner started"); > // proceed only if multiple clean schedules are enabled or if there are > no pending cleans. > if (scheduleInline) { > scheduleTableServiceInternal(cleanInstantTime, Option.empty(), > TableServiceType.CLEAN); > table.getMetaClient().reloadActiveTimeline(); > } > metadata = table.clean(context, cleanInstantTime, skipLocking); > if (timerContext != null && metadata != null) { > long durationMs = metrics.getDurationInMs(timerContext.stop()); > metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); > LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" > + " Earliest Retained Instant :" + > metadata.getEarliestCommitToRetain() > + " cleanerElapsedMs" + durationMs); > } > } > return metadata; > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)