This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch release-0.6.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.6.0 by this push: new 62c297e [HUDI-1177]: fixed TaskNotSerializableException in TimestampBasedKeyGenerator (#1987) 62c297e is described below commit 62c297e2ffa891bd19c1ea402293abcfd0074c59 Author: Pratyaksh Sharma <pratyaks...@gmail.com> AuthorDate: Thu Aug 20 06:13:34 2020 +0530 [HUDI-1177]: fixed TaskNotSerializableException in TimestampBasedKeyGenerator (#1987) Co-authored-by: Bhavani Sudha Saktheeswaran <bhavanisud...@gmail.com> --- .../hudi/keygen/TimestampBasedKeyGenerator.java | 40 ++++++++++++++-------- .../hudi/keygen/parser/HoodieDateTimeParser.java | 7 ++-- .../keygen/parser/HoodieDateTimeParserImpl.java | 22 ++++++------ 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index 0209fe8..25a52fe 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceUtils; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -40,7 +41,6 @@ import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.text.ParseException; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -61,7 +61,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { private final TimeUnit timeUnit; private final TimestampType timestampType; private final String outputDateFormat; - private DateTimeFormatter inputFormatter; + private transient Option<DateTimeFormatter> inputFormatter; + private transient DateTimeFormatter partitionFormatter; private final HoodieDateTimeParser parser; // TimeZone detailed settings reference @@ -108,13 +109,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass); this.outputDateTimeZone = parser.getOutputDateTimeZone(); this.outputDateFormat = parser.getOutputDateFormat(); - this.inputFormatter = parser.getInputFormatter(); this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); - if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { - this.inputFormatter = parser.getInputFormatter(); - } - switch (this.timestampType) { case EPOCHMILLISECONDS: timeUnit = MILLISECONDS; @@ -147,17 +143,28 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { } /** + * The function takes care of lazily initialising dateTimeFormatter variables only once. + */ + private void initIfNeeded() { + if (this.inputFormatter == null) { + this.inputFormatter = parser.getInputFormatter(); + } + if (this.partitionFormatter == null) { + this.partitionFormatter = DateTimeFormat.forPattern(outputDateFormat); + if (this.outputDateTimeZone != null) { + partitionFormatter = partitionFormatter.withZone(outputDateTimeZone); + } + } + } + + /** * Parse and fetch partition path based on data type. * * @param partitionVal partition path object value fetched from record/row * @return the parsed partition path based on data type - * @throws ParseException on any parse exception */ - private String getPartitionPath(Object partitionVal) throws ParseException { - DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat); - if (this.outputDateTimeZone != null) { - partitionFormatter = partitionFormatter.withZone(outputDateTimeZone); - } + private String getPartitionPath(Object partitionVal) { + initIfNeeded(); long timeMs; if (partitionVal instanceof Double) { timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue()); @@ -166,13 +173,16 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { } else if (partitionVal instanceof Long) { timeMs = convertLongTimeToMillis((Long) partitionVal); } else if (partitionVal instanceof CharSequence) { - DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString()); + if (!inputFormatter.isPresent()) { + throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!"); + } + DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString()); if (this.outputDateTimeZone == null) { // Use the timezone that came off the date that was passed in, if it had one partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone()); } - timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis(); + timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis(); } else { throw new HoodieNotSupportedException( "Unexpected type for partition field: " + partitionVal.getClass().getName()); diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java index 3550193..6612f4c 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java @@ -17,10 +17,13 @@ package org.apache.hudi.keygen.parser; +import org.apache.hudi.common.util.Option; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormatter; -public interface HoodieDateTimeParser { +import java.io.Serializable; + +public interface HoodieDateTimeParser extends Serializable { /** * Returns the output date format in which the partition paths will be created for the hudi dataset. @@ -32,7 +35,7 @@ public interface HoodieDateTimeParser { * Returns input formats in which datetime based values might be coming in incoming records. * @return */ - DateTimeFormatter getInputFormatter(); + Option<DateTimeFormatter> getInputFormatter(); /** * Returns the datetime zone one should expect the incoming values into. diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java index 933e1af..11790cb 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java @@ -19,6 +19,7 @@ package org.apache.hudi.keygen.parser; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config; import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType; import org.joda.time.DateTimeZone; @@ -37,7 +38,6 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa private String configInputDateFormatList; private final String configInputDateFormatDelimiter; private final TypedProperties config; - private DateTimeFormatter inputFormatter; // TimeZone detailed settings reference // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html @@ -48,14 +48,6 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); this.inputDateTimeZone = getInputDateTimeZone(); this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter(); - - TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); - if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { - DataSourceUtils.checkRequiredProperties(config, - Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); - this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, ""); - inputFormatter = getInputDateFormatter(); - } } private String getConfigInputDateFormatDelimiter() { @@ -94,8 +86,16 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa } @Override - public DateTimeFormatter getInputFormatter() { - return this.inputFormatter; + public Option<DateTimeFormatter> getInputFormatter() { + TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); + if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { + DataSourceUtils.checkRequiredProperties(config, + Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, ""); + return Option.of(getInputDateFormatter()); + } + + return Option.empty(); } @Override