This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f53bca4  [HUDI-1655] Support custom date format and fix unsupported 
exception in DatePartitionPathSelector (#2621)
f53bca4 is described below

commit f53bca404f1482e0e99ad683dd29bfaff8bfb8ab
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Thu Mar 4 21:01:51 2021 -0800

    [HUDI-1655] Support custom date format and fix unsupported exception in 
DatePartitionPathSelector (#2621)
    
    - Add a config to allow parsing custom date format in 
`DatePartitionPathSelector`. Currently it assumes date partition string in the 
format of `yyyy-MM-dd`.
    - Fix a bug where `UnsupportedOperationException` was thrown when sort 
`eligibleFiles` in-place. Changed to sort it and store in a new list.
---
 .../sources/helpers/DatePartitionPathSelector.java | 30 +++++++++++------
 .../helpers/TestDatePartitionPathSelector.java     | 38 ++++++++++++----------
 2 files changed, 41 insertions(+), 27 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 2cedb6c..c22657f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -35,13 +35,16 @@ import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
@@ -59,12 +62,16 @@ import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelecto
  * <p>The date based partition is expected to be of the format '<date 
string>=yyyy-mm-dd' or
  * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition 
path can be of the
  * form 
`<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
- * `<basepath>/<<date-based-partition>/`
+ * `<basepath>/<<date-based-partition>/`.
+ *
+ * <p>The date based partition format can be configured via this property
+ * hoodie.deltastreamer.source.dfs.datepartitioned.date.format
  */
 public class DatePartitionPathSelector extends DFSPathSelector {
 
   private static volatile Logger LOG = 
LogManager.getLogger(DatePartitionPathSelector.class);
 
+  private final String dateFormat;
   private final int datePartitionDepth;
   private final int numPrevDaysToList;
   private final LocalDate fromDate;
@@ -73,6 +80,9 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
 
   /** Configs supported. */
   public static class Config {
+    public static final String DATE_FORMAT = 
"hoodie.deltastreamer.source.dfs.datepartitioned.date.format";
+    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+
     public static final String DATE_PARTITION_DEPTH =
         "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
     public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no 
(date) partition
@@ -84,7 +94,6 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
     public static final String CURRENT_DATE =
         "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
 
-
     public static final String PARTITIONS_LIST_PARALLELISM =
         "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
     public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
@@ -96,6 +105,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
      * datePartitionDepth = 0 is same as basepath and there is no partition. 
In which case
      * this path selector would be a no-op and lists all paths under the table 
basepath.
      */
+    dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
     datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, 
DEFAULT_DATE_PARTITION_DEPTH);
     // If not specified the current date is assumed by default.
     currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, 
LocalDate.now().toString()));
@@ -130,20 +140,19 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
           FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
           return listEligibleFiles(fs, new Path(path), 
lastCheckpointTime).stream();
         }, partitionsListParallelism);
-    // sort them by modification time.
-    
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+    // sort them by modification time ascending.
+    List<FileStatus> sortedEligibleFiles = eligibleFiles.stream()
+        
.sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
 
     // Filter based on checkpoint & input size, if needed
     long currentBytes = 0;
-    long maxModificationTime = Long.MIN_VALUE;
     List<FileStatus> filteredFiles = new ArrayList<>();
-    for (FileStatus f : eligibleFiles) {
+    for (FileStatus f : sortedEligibleFiles) {
       if (currentBytes + f.getLen() >= sourceLimit) {
         // we have enough data, we are done
         break;
       }
 
-      maxModificationTime = f.getModificationTime();
       currentBytes += f.getLen();
       filteredFiles.add(f);
     }
@@ -156,7 +165,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
 
     // read the files out.
     String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
-
+    long maxModificationTime = filteredFiles.get(filteredFiles.size() - 
1).getModificationTime();
     return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(maxModificationTime));
   }
 
@@ -193,14 +202,15 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
           String[] splits = s.split("/");
           String datePartition = splits[splits.length - 1];
           LocalDate partitionDate;
+          DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern(dateFormat);
           if (datePartition.contains("=")) {
             String[] moreSplit = datePartition.split("=");
             ValidationUtils.checkArgument(
                 moreSplit.length == 2,
                 "Partition Field (" + datePartition + ") not in expected 
format");
-            partitionDate = LocalDate.parse(moreSplit[1]);
+            partitionDate = LocalDate.parse(moreSplit[1], dateFormatter);
           } else {
-            partitionDate = LocalDate.parse(datePartition);
+            partitionDate = LocalDate.parse(datePartition, dateFormatter);
           }
           return (partitionDate.isEqual(fromDate) || 
partitionDate.isAfter(fromDate))
               && (partitionDate.isEqual(currentDate) || 
partitionDate.isBefore(currentDate));
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index b7e1279..30d0993 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -40,6 +41,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
 import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -78,11 +80,11 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
   /*
    * Create Date partitions with some files under each of the leaf Dirs.
    */
-  public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean 
hiveStyle)
+  public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean 
hiveStyle, String dateFormat)
       throws IOException {
     List<Path> allFiles = new ArrayList<>();
     for (Path path : leafDirs) {
-      List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle);
+      List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle, 
dateFormat);
       for (Path datePartition : datePartitions) {
         allFiles.addAll(createRandomFilesUnder(datePartition));
       }
@@ -126,11 +128,12 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
   /*
    * Generate date based partitions under a parent dir with or without 
hivestyle formatting.
    */
-  private List<Path> generateDatePartitionsUnder(Path parent, boolean 
hiveStyle) throws IOException {
+  private List<Path> generateDatePartitionsUnder(Path parent, boolean 
hiveStyle, String dateFormat) throws IOException {
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
     List<Path> datePartitions = new ArrayList<>();
     String prefix = (hiveStyle ? "dt=" : "");
     for (int i = 0; i < 5; i++) {
-      Path child = new Path(parent, prefix + totalDates.get(i).toString());
+      Path child = new Path(parent, prefix + 
formatter.format(totalDates.get(i)));
       fs.mkdirs(child);
       datePartitions.add(child);
     }
@@ -155,9 +158,10 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
   }
 
   private static TypedProperties getProps(
-      String basePath, int datePartitionDepth, int numDaysToList, String 
currentDate) {
+      String basePath, String dateFormat, int datePartitionDepth, int 
numDaysToList, String currentDate) {
     TypedProperties properties = new TypedProperties();
     properties.put(ROOT_INPUT_PATH_PROP, basePath);
+    properties.put(DATE_FORMAT, dateFormat);
     properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
     properties.put(LOOKBACK_DAYS, "" + numDaysToList);
     properties.put(CURRENT_DATE, currentDate);
@@ -172,14 +176,14 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
   private static Stream<Arguments> configParams() {
     Object[][] data =
         new Object[][] {
-          {"table1", 0, 2, "2020-07-25", true, 1},
-          {"table2", 0, 2, "2020-07-25", false, 1},
-          {"table3", 1, 3, "2020-07-25", true, 4},
-          {"table4", 1, 3, "2020-07-25", false, 4},
-          {"table5", 2, 1, "2020-07-25", true, 10},
-          {"table6", 2, 1, "2020-07-25", false, 10},
-          {"table7", 3, 2, "2020-07-25", true, 75},
-          {"table8", 3, 2, "2020-07-25", false, 75}
+          {"table1", "yyyyMMdd", 0, 2, "2020-07-25", true, 1},
+          {"table2", "yyyyMMdd", 0, 2, "2020-07-25", false, 1},
+          {"table3", "yyyyMMMdd", 1, 3, "2020-07-25", true, 4},
+          {"table4", "yyyyMMMdd", 1, 3, "2020-07-25", false, 4},
+          {"table5", "yyyy-MM-dd", 2, 1, "2020-07-25", true, 10},
+          {"table6", "yyyy-MM-dd", 2, 1, "2020-07-25", false, 10},
+          {"table7", "yyyy-MMM-dd", 3, 2, "2020-07-25", true, 75},
+          {"table8", "yyyy-MMM-dd", 3, 2, "2020-07-25", false, 75}
         };
     return Stream.of(data).map(Arguments::of);
   }
@@ -188,13 +192,14 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
   @MethodSource("configParams")
   public void testPruneDatePartitionPaths(
       String tableName,
+      String dateFormat,
       int datePartitionDepth,
       int numPrevDaysToList,
       String currentDate,
       boolean isHiveStylePartition,
       int expectedNumFiles)
       throws IOException {
-    TypedProperties props = getProps(basePath + "/" + tableName, 
datePartitionDepth, numPrevDaysToList, currentDate);
+    TypedProperties props = getProps(basePath + "/" + tableName, dateFormat, 
datePartitionDepth, numPrevDaysToList, currentDate);
     DatePartitionPathSelector pathSelector = new 
DatePartitionPathSelector(props, jsc.hadoopConfiguration());
 
     Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
@@ -203,10 +208,9 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
     // Create parent dir
     List<Path> leafDirs = new ArrayList<>();
     createParentDirsBeforeDatePartitions(root, generateRandomStrings(), 
totalDepthBeforeDatePartitions, leafDirs);
-    createDatePartitionsWithFiles(leafDirs, isHiveStylePartition);
+    createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
 
     List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, 
root.toString());
-
-    assertEquals(expectedNumFiles, 
pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
+    assertEquals(expectedNumFiles, paths.size());
   }
 }

Reply via email to