This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c1ceb628e576dd50f9c3bdf1ab830dcf61f70296 Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Sun Oct 23 17:35:58 2022 +0800 [MINOR] fix Invalid value for YearOfEra --- .../apache/hudi/client/BaseHoodieWriteClient.java | 28 ++++++++++++++-------- .../apache/hudi/client/SparkRDDWriteClient.java | 22 +++++++++++++++++ .../table/timeline/HoodieActiveTimeline.java | 18 ++++++++++++++ 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index d9f260e633..ff500a617e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -104,6 +104,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.text.ParseException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -115,6 +116,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY; +import static org.apache.hudi.common.model.TableServiceType.CLEAN; /** * Abstract Write Client providing functionality for performing commit, index updates and rollback @@ -306,14 +308,20 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf); void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { - if (writeTimer != null) { - long durationInMs = metrics.getDurationInMs(writeTimer.stop()); - // instantTime could be a non-standard value, so use `parseDateFromInstantTimeSafely` - // e.g. INIT_INSTANT_TS, METADATA_BOOTSTRAP_INSTANT_TS and FULL_BOOTSTRAP_INSTANT_TS in HoodieTimeline - HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant -> - metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType) - ); - writeTimer = null; + try { + if (writeTimer != null) { + long durationInMs = metrics.getDurationInMs(writeTimer.stop()); + long commitEpochTimeInMs = 0; + if (HoodieActiveTimeline.checkDateTime(instantTime)) { + commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(); + } + metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, + metadata, actionType); + writeTimer = null; + } + } catch (ParseException e) { + throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime + + "Instant time is not of valid format", e); } } @@ -862,7 +870,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, LOG.info("Cleaner started"); // proceed only if multiple clean schedules are enabled or if there are no pending cleans. if (scheduleInline) { - scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); + scheduleTableServiceInternal(cleanInstantTime, Option.empty(), CLEAN); table.getMetaClient().reloadActiveTimeline(); } } @@ -1286,7 +1294,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, * @param extraMetadata Extra Metadata to be stored */ protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException { - return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent(); + return scheduleTableService(instantTime, extraMetadata, CLEAN).isPresent(); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7110e26bb0..32c4a0a06d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; @@ -68,6 +69,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.nio.charset.StandardCharsets; +import java.text.ParseException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -324,6 +326,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant -> metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION) ); + try { + long commitEpochTimeInMs = 0; + if (HoodieActiveTimeline.checkDateTime(compactionCommitTime)) { + commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(); + } + metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + compactionCommitTime, e); + } } LOG.info("Compacted successfully on commit " + compactionCommitTime); } @@ -406,6 +418,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant -> metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); + try { + long commitEpochTimeInMs = 0; + if (HoodieActiveTimeline.checkDateTime(clusteringCommitTime)) { + commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(); + } + metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + clusteringCommitTime, e); + } } LOG.info("Clustering successfully on commit " + clusteringCommitTime); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 2b27d3ab5e..414e92e58b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -39,6 +39,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -49,6 +51,8 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.SECS_INSTANT_TIMESTAMP_FORMAT; + /** * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the * ActiveTimeline and the rest are Archived. ActiveTimeline is a special timeline that allows for creation of instants @@ -125,6 +129,20 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return parsedDate; } + /** + * Check if the instantTime is in SECS_INSTANT_TIMESTAMP_FORMAT format. + */ + public static boolean checkDateTime(String instantTime) { + DateTimeFormatter dtf = DateTimeFormatter.ofPattern(SECS_INSTANT_TIMESTAMP_FORMAT); + boolean flag = true; + try { + LocalDateTime.parse(instantTime, dtf); + } catch (Exception e) { + flag = false; + } + return flag; + } + /** * Format the Date to a String representing the timestamp of a Hoodie Instant. */