Blazer-007 commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2941463056


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -244,17 +284,28 @@ public Extractor<String, FileAwareInputStream> 
getExtractor(WorkUnitState state)
    * (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is 
specified separately via
    * {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
    *
-   * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is 
appended to all partition values
-   * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For 
tables using standard daily partition
-   * format ({@code yyyy-MM-dd}), set {@code 
iceberg.hourly.partition.enabled=false}. Users should always provide
-   * dates in standard {@code yyyy-MM-dd} format; the hour suffix is 
automatically managed.
+   * <p><b>Partition Value Format:</b> The output partition value used in the 
filter expression is controlled by
+   * {@code iceberg.partition.value.format}, a standard {@link 
java.time.format.DateTimeFormatter} pattern.
+   * The {@code iceberg.filter.date} input is always provided in {@code 
yyyy-MM-dd} form; the format governs
+   * how it is rendered as a partition value. {@code iceberg.partition.hour} 
(0-23, default 0) supplies the
+   * hour component when the format includes {@code HH}. Examples:
+   * <ul>
+   *   <li>{@code yyyy-MM-dd-HH} with hour=5 → {@code 2025-04-01-05}</li>
+   *   <li>{@code dd-MM-yyyy-HH} with hour=0 → {@code 01-04-2025-00}</li>
+   *   <li>{@code yyyyMMdd}              → {@code 20250401}       (daily, 
compact)</li>

Review Comment:
   needs update as `iceberg.partition.hour` is no longer used



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -280,68 +331,107 @@ private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(Sour
     Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
       "iceberg.filter.date is required when iceberg.filter.enabled=true");
 
-    // Handle CURRENT_DATE placeholder for flows
-    if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
-      dateValue = LocalDate.now().toString();
-      log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
-    }
+    // Resolve the DateTimeFormatter used to render each partition value.
+    // resolvePartitionFormatter normalises both the new 
iceberg.partition.value.datetime.format
+    // path and the legacy iceberg.hourly.partition.enabled path into a single 
formatter.
+    DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state);
 
-    // Apply lookback period for date partitions
-    // lookbackDays=1 (default) means copy only the specified date
-    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
-    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
-    List<String> values = Lists.newArrayList();
-
-    if (lookbackDays >= 1) {
-      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
-
-      // Check if hourly partitioning is enabled
-      boolean isHourlyPartition = 
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, 
DEFAULT_HOURLY_PARTITION_ENABLED);
-
-      // Parse the date in yyyy-MM-dd format
-      LocalDate start;
+    // Resolve the reference datetime for the filter.
+    // CURRENT_DATE uses LocalDateTime.now() so a formatter pattern that 
includes HH will
+    // embed the current clock-hour automatically.  For a specific date 
(yyyy-MM-dd) the time
+    // defaults to midnight (00:00).
+    LocalDateTime startDateTime;
+    if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+      startDateTime = LocalDateTime.now();
+      log.info("Resolved {} placeholder to current datetime: {}", 
CURRENT_DATE_PLACEHOLDER, startDateTime);
+    } else {
       try {
-        start = LocalDate.parse(dateValue);
+        startDateTime = LocalDate.parse(dateValue).atStartOfDay();
       } catch (java.time.format.DateTimeParseException e) {
         String errorMsg = String.format(
           "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
           ICEBERG_FILTER_DATE, dateValue, e.getMessage());
         log.error(errorMsg);
         throw new IllegalArgumentException(errorMsg, e);
       }
+    }

Review Comment:
   `LocalDate.parse(dateValue, partitionFormatter)`, please update `errorMsg`  
as well



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -280,68 +331,107 @@ private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(Sour
     Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
       "iceberg.filter.date is required when iceberg.filter.enabled=true");
 
-    // Handle CURRENT_DATE placeholder for flows
-    if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
-      dateValue = LocalDate.now().toString();
-      log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
-    }
+    // Resolve the DateTimeFormatter used to render each partition value.
+    // resolvePartitionFormatter normalises both the new 
iceberg.partition.value.datetime.format
+    // path and the legacy iceberg.hourly.partition.enabled path into a single 
formatter.
+    DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state);
 
-    // Apply lookback period for date partitions
-    // lookbackDays=1 (default) means copy only the specified date
-    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
-    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
-    List<String> values = Lists.newArrayList();
-
-    if (lookbackDays >= 1) {
-      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
-
-      // Check if hourly partitioning is enabled
-      boolean isHourlyPartition = 
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, 
DEFAULT_HOURLY_PARTITION_ENABLED);
-
-      // Parse the date in yyyy-MM-dd format
-      LocalDate start;
+    // Resolve the reference datetime for the filter.
+    // CURRENT_DATE uses LocalDateTime.now() so a formatter pattern that 
includes HH will
+    // embed the current clock-hour automatically.  For a specific date 
(yyyy-MM-dd) the time
+    // defaults to midnight (00:00).
+    LocalDateTime startDateTime;
+    if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+      startDateTime = LocalDateTime.now();
+      log.info("Resolved {} placeholder to current datetime: {}", 
CURRENT_DATE_PLACEHOLDER, startDateTime);
+    } else {
       try {
-        start = LocalDate.parse(dateValue);
+        startDateTime = LocalDate.parse(dateValue).atStartOfDay();
       } catch (java.time.format.DateTimeParseException e) {
         String errorMsg = String.format(
           "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
           ICEBERG_FILTER_DATE, dateValue, e.getMessage());
         log.error(errorMsg);
         throw new IllegalArgumentException(errorMsg, e);
       }
+    }
 
-      for (int i = 0; i < lookbackDays; i++) {
-        String dateOnly = start.minusDays(i).toString();
-        // Append hour suffix if hourly partitioning is enabled
-        String partitionValue = isHourlyPartition ? dateOnly + 
HOURLY_PARTITION_SUFFIX : dateOnly;
-        values.add(partitionValue);
-        log.info("Including partition: {}={}", datePartitionColumn, 
partitionValue);
+    // Delegate partition value list + OR expression to 
IcebergPartitionFilterGenerator.
+    // When iceberg.lookback.hours > 0 it takes precedence over 
iceberg.lookback.days.
+    int lookbackHours = state.getPropAsInt(ICEBERG_LOOKBACK_HOURS, 
DEFAULT_LOOKBACK_HOURS);

Review Comment:
   Lets have upper bound on `lookbackHours` as well, 48 (2 days) will be a 
better bound we dont want user to schedule jobs at each hour with lookback more 
than 1 or 2 day



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -244,17 +284,28 @@ public Extractor<String, FileAwareInputStream> 
getExtractor(WorkUnitState state)
    * (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is 
specified separately via
    * {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
    *
-   * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is 
appended to all partition values
-   * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For 
tables using standard daily partition
-   * format ({@code yyyy-MM-dd}), set {@code 
iceberg.hourly.partition.enabled=false}. Users should always provide
-   * dates in standard {@code yyyy-MM-dd} format; the hour suffix is 
automatically managed.
+   * <p><b>Partition Value Format:</b> The output partition value used in the 
filter expression is controlled by
+   * {@code iceberg.partition.value.format}, a standard {@link 
java.time.format.DateTimeFormatter} pattern.
+   * The {@code iceberg.filter.date} input is always provided in {@code 
yyyy-MM-dd} form; the format governs
+   * how it is rendered as a partition value. {@code iceberg.partition.hour} 
(0-23, default 0) supplies the
+   * hour component when the format includes {@code HH}. Examples:
+   * <ul>
+   *   <li>{@code yyyy-MM-dd-HH} with hour=5 → {@code 2025-04-01-05}</li>
+   *   <li>{@code dd-MM-yyyy-HH} with hour=0 → {@code 01-04-2025-00}</li>
+   *   <li>{@code yyyyMMdd}              → {@code 20250401}       (daily, 
compact)</li>
+   * </ul>
+   * When {@code iceberg.partition.value.format} is set it supersedes {@code 
iceberg.hourly.partition.enabled}.
+   * When absent, the legacy {@code iceberg.hourly.partition.enabled} / {@code 
iceberg.partition.hour} behaviour
+   * is preserved for backward compatibility (default: appends {@code -00} 
suffix).
    *
    * <p><b>Configuration Examples:</b>
    * <ul>
-   * <li>Static: {@code iceberg.partition.column=datepartition, 
iceberg.filter.date=2025-04-03, iceberg.lookback.days=3}
-   * discovers: datepartition=2025-04-03, 2025-04-02, 2025-04-01</li>
+   * <li>Standard daily: {@code iceberg.partition.value.format=yyyy-MM-dd, 
iceberg.filter.date=2025-04-03,
+   *     iceberg.lookback.days=3} → partitions: {@code 2025-04-03, 2025-04-02, 
2025-04-01}</li>
+   * <li>Reversed-date hourly: {@code 
iceberg.partition.value.format=dd-MM-yyyy-HH, iceberg.partition.hour=5}

Review Comment:
   here as well



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -80,23 +82,39 @@
  * iceberg.table.name=table1
  * iceberg.catalog.uri=ICEBERG_CATALOG_URI
  *
- * # Partition filtering with lookback - Static date (hourly partitions by 
default)
+ * # Partition filtering with lookback - Static date
  * iceberg.filter.enabled=true
- * iceberg.partition.column=datepartition  # Optional, defaults to 
"datepartition"
- * iceberg.filter.date=2025-04-01         # Date value (yyyy-MM-dd format)
+ * iceberg.partition.column=datepartition         # Optional, defaults to 
"datepartition"
+ * iceberg.filter.date=2025-04-01                 # Input date always in 
yyyy-MM-dd format

Review Comment:
   Comment can be uodate to input date in format of pattern specified by 
`"iceberg.partition.value.datetime.format";`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to