This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f53bca4 [HUDI-1655] Support custom date format and fix unsupported exception in DatePartitionPathSelector (#2621) f53bca4 is described below commit f53bca404f1482e0e99ad683dd29bfaff8bfb8ab Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Thu Mar 4 21:01:51 2021 -0800 [HUDI-1655] Support custom date format and fix unsupported exception in DatePartitionPathSelector (#2621) - Add a config to allow parsing custom date format in `DatePartitionPathSelector`. Currently it assumes date partition string in the format of `yyyy-MM-dd`. - Fix a bug where `UnsupportedOperationException` was thrown when sort `eligibleFiles` in-place. Changed to sort it and store in a new list. --- .../sources/helpers/DatePartitionPathSelector.java | 30 +++++++++++------ .../helpers/TestDatePartitionPathSelector.java | 38 ++++++++++++---------- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java index 2cedb6c..c22657f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java @@ -35,13 +35,16 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_FORMAT; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM; @@ -59,12 +62,16 @@ import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelecto * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or - * `<basepath>/<<date-based-partition>/` + * `<basepath>/<<date-based-partition>/`. + * + * <p>The date based partition format can be configured via this property + * hoodie.deltastreamer.source.dfs.datepartitioned.date.format */ public class DatePartitionPathSelector extends DFSPathSelector { private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class); + private final String dateFormat; private final int datePartitionDepth; private final int numPrevDaysToList; private final LocalDate fromDate; @@ -73,6 +80,9 @@ public class DatePartitionPathSelector extends DFSPathSelector { /** Configs supported. */ public static class Config { + public static final String DATE_FORMAT = "hoodie.deltastreamer.source.dfs.datepartitioned.date.format"; + public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd"; + public static final String DATE_PARTITION_DEPTH = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth"; public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition @@ -84,7 +94,6 @@ public class DatePartitionPathSelector extends DFSPathSelector { public static final String CURRENT_DATE = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate"; - public static final String PARTITIONS_LIST_PARALLELISM = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism"; public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20; @@ -96,6 +105,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { * datePartitionDepth = 0 is same as basepath and there is no partition. In which case * this path selector would be a no-op and lists all paths under the table basepath. */ + dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT); datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH); // If not specified the current date is assumed by default. currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); @@ -130,20 +140,19 @@ public class DatePartitionPathSelector extends DFSPathSelector { FileSystem fs = new Path(path).getFileSystem(serializedConf.get()); return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream(); }, partitionsListParallelism); - // sort them by modification time. - eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime)); + // sort them by modification time ascending. + List<FileStatus> sortedEligibleFiles = eligibleFiles.stream() + .sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList()); // Filter based on checkpoint & input size, if needed long currentBytes = 0; - long maxModificationTime = Long.MIN_VALUE; List<FileStatus> filteredFiles = new ArrayList<>(); - for (FileStatus f : eligibleFiles) { + for (FileStatus f : sortedEligibleFiles) { if (currentBytes + f.getLen() >= sourceLimit) { // we have enough data, we are done break; } - maxModificationTime = f.getModificationTime(); currentBytes += f.getLen(); filteredFiles.add(f); } @@ -156,7 +165,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { // read the files out. String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); - + long maxModificationTime = filteredFiles.get(filteredFiles.size() - 1).getModificationTime(); return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime)); } @@ -193,14 +202,15 @@ public class DatePartitionPathSelector extends DFSPathSelector { String[] splits = s.split("/"); String datePartition = splits[splits.length - 1]; LocalDate partitionDate; + DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(dateFormat); if (datePartition.contains("=")) { String[] moreSplit = datePartition.split("="); ValidationUtils.checkArgument( moreSplit.length == 2, "Partition Field (" + datePartition + ") not in expected format"); - partitionDate = LocalDate.parse(moreSplit[1]); + partitionDate = LocalDate.parse(moreSplit[1], dateFormatter); } else { - partitionDate = LocalDate.parse(datePartition); + partitionDate = LocalDate.parse(datePartition, dateFormatter); } return (partitionDate.isEqual(fromDate) || partitionDate.isAfter(fromDate)) && (partitionDate.isEqual(currentDate) || partitionDate.isBefore(currentDate)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java index b7e1279..30d0993 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java @@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -40,6 +41,7 @@ import java.util.stream.Stream; import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -78,11 +80,11 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { /* * Create Date partitions with some files under each of the leaf Dirs. */ - public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean hiveStyle) + public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean hiveStyle, String dateFormat) throws IOException { List<Path> allFiles = new ArrayList<>(); for (Path path : leafDirs) { - List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle); + List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle, dateFormat); for (Path datePartition : datePartitions) { allFiles.addAll(createRandomFilesUnder(datePartition)); } @@ -126,11 +128,12 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { /* * Generate date based partitions under a parent dir with or without hivestyle formatting. */ - private List<Path> generateDatePartitionsUnder(Path parent, boolean hiveStyle) throws IOException { + private List<Path> generateDatePartitionsUnder(Path parent, boolean hiveStyle, String dateFormat) throws IOException { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat); List<Path> datePartitions = new ArrayList<>(); String prefix = (hiveStyle ? "dt=" : ""); for (int i = 0; i < 5; i++) { - Path child = new Path(parent, prefix + totalDates.get(i).toString()); + Path child = new Path(parent, prefix + formatter.format(totalDates.get(i))); fs.mkdirs(child); datePartitions.add(child); } @@ -155,9 +158,10 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { } private static TypedProperties getProps( - String basePath, int datePartitionDepth, int numDaysToList, String currentDate) { + String basePath, String dateFormat, int datePartitionDepth, int numDaysToList, String currentDate) { TypedProperties properties = new TypedProperties(); properties.put(ROOT_INPUT_PATH_PROP, basePath); + properties.put(DATE_FORMAT, dateFormat); properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth); properties.put(LOOKBACK_DAYS, "" + numDaysToList); properties.put(CURRENT_DATE, currentDate); @@ -172,14 +176,14 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { private static Stream<Arguments> configParams() { Object[][] data = new Object[][] { - {"table1", 0, 2, "2020-07-25", true, 1}, - {"table2", 0, 2, "2020-07-25", false, 1}, - {"table3", 1, 3, "2020-07-25", true, 4}, - {"table4", 1, 3, "2020-07-25", false, 4}, - {"table5", 2, 1, "2020-07-25", true, 10}, - {"table6", 2, 1, "2020-07-25", false, 10}, - {"table7", 3, 2, "2020-07-25", true, 75}, - {"table8", 3, 2, "2020-07-25", false, 75} + {"table1", "yyyyMMdd", 0, 2, "2020-07-25", true, 1}, + {"table2", "yyyyMMdd", 0, 2, "2020-07-25", false, 1}, + {"table3", "yyyyMMMdd", 1, 3, "2020-07-25", true, 4}, + {"table4", "yyyyMMMdd", 1, 3, "2020-07-25", false, 4}, + {"table5", "yyyy-MM-dd", 2, 1, "2020-07-25", true, 10}, + {"table6", "yyyy-MM-dd", 2, 1, "2020-07-25", false, 10}, + {"table7", "yyyy-MMM-dd", 3, 2, "2020-07-25", true, 75}, + {"table8", "yyyy-MMM-dd", 3, 2, "2020-07-25", false, 75} }; return Stream.of(data).map(Arguments::of); } @@ -188,13 +192,14 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { @MethodSource("configParams") public void testPruneDatePartitionPaths( String tableName, + String dateFormat, int datePartitionDepth, int numPrevDaysToList, String currentDate, boolean isHiveStylePartition, int expectedNumFiles) throws IOException { - TypedProperties props = getProps(basePath + "/" + tableName, datePartitionDepth, numPrevDaysToList, currentDate); + TypedProperties props = getProps(basePath + "/" + tableName, dateFormat, datePartitionDepth, numPrevDaysToList, currentDate); DatePartitionPathSelector pathSelector = new DatePartitionPathSelector(props, jsc.hadoopConfiguration()); Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP)); @@ -203,10 +208,9 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { // Create parent dir List<Path> leafDirs = new ArrayList<>(); createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs); - createDatePartitionsWithFiles(leafDirs, isHiveStylePartition); + createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat); List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString()); - - assertEquals(expectedNumFiles, pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size()); + assertEquals(expectedNumFiles, paths.size()); } }