Blazer-007 commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2941463056
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -244,17 +284,28 @@ public Extractor<String, FileAwareInputStream>
getExtractor(WorkUnitState state)
* (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is
specified separately via
* {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
*
- * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is
appended to all partition values
- * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For
tables using standard daily partition
- * format ({@code yyyy-MM-dd}), set {@code
iceberg.hourly.partition.enabled=false}. Users should always provide
- * dates in standard {@code yyyy-MM-dd} format; the hour suffix is
automatically managed.
+ * <p><b>Partition Value Format:</b> The output partition value used in the
filter expression is controlled by
+ * {@code iceberg.partition.value.format}, a standard {@link
java.time.format.DateTimeFormatter} pattern.
+ * The {@code iceberg.filter.date} input is always provided in {@code
yyyy-MM-dd} form; the format governs
+ * how it is rendered as a partition value. {@code iceberg.partition.hour}
(0-23, default 0) supplies the
+ * hour component when the format includes {@code HH}. Examples:
+ * <ul>
+ * <li>{@code yyyy-MM-dd-HH} with hour=5 → {@code 2025-04-01-05}</li>
+ * <li>{@code dd-MM-yyyy-HH} with hour=0 → {@code 01-04-2025-00}</li>
+ * <li>{@code yyyyMMdd} → {@code 20250401} (daily,
compact)</li>
Review Comment:
needs update as `iceberg.partition.hour` is no longer used
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -280,68 +331,107 @@ private List<IcebergTable.FilePathWithPartition>
discoverPartitionFilePaths(Sour
Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
"iceberg.filter.date is required when iceberg.filter.enabled=true");
- // Handle CURRENT_DATE placeholder for flows
- if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
- dateValue = LocalDate.now().toString();
- log.info("Resolved {} placeholder to current date: {}",
CURRENT_DATE_PLACEHOLDER, dateValue);
- }
+ // Resolve the DateTimeFormatter used to render each partition value.
+ // resolvePartitionFormatter normalises both the new
iceberg.partition.value.datetime.format
+ // path and the legacy iceberg.hourly.partition.enabled path into a single
formatter.
+ DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state);
- // Apply lookback period for date partitions
- // lookbackDays=1 (default) means copy only the specified date
- // lookbackDays=3 means copy specified date + 2 previous days (total 3
days)
- int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS,
DEFAULT_LOOKBACK_DAYS);
- List<String> values = Lists.newArrayList();
-
- if (lookbackDays >= 1) {
- log.info("Applying lookback period of {} days for date partition column
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
-
- // Check if hourly partitioning is enabled
- boolean isHourlyPartition =
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED,
DEFAULT_HOURLY_PARTITION_ENABLED);
-
- // Parse the date in yyyy-MM-dd format
- LocalDate start;
+ // Resolve the reference datetime for the filter.
+ // CURRENT_DATE uses LocalDateTime.now() so a formatter pattern that
includes HH will
+ // embed the current clock-hour automatically. For a specific date
(yyyy-MM-dd) the time
+ // defaults to midnight (00:00).
+ LocalDateTime startDateTime;
+ if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+ startDateTime = LocalDateTime.now();
+ log.info("Resolved {} placeholder to current datetime: {}",
CURRENT_DATE_PLACEHOLDER, startDateTime);
+ } else {
try {
- start = LocalDate.parse(dateValue);
+ startDateTime = LocalDate.parse(dateValue).atStartOfDay();
} catch (java.time.format.DateTimeParseException e) {
String errorMsg = String.format(
"Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd.
Error: %s",
ICEBERG_FILTER_DATE, dateValue, e.getMessage());
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg, e);
}
+ }
Review Comment:
`LocalDate.parse(dateValue, partitionFormatter)`, please update `errorMsg`
as well
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -280,68 +331,107 @@ private List<IcebergTable.FilePathWithPartition>
discoverPartitionFilePaths(Sour
Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
"iceberg.filter.date is required when iceberg.filter.enabled=true");
- // Handle CURRENT_DATE placeholder for flows
- if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
- dateValue = LocalDate.now().toString();
- log.info("Resolved {} placeholder to current date: {}",
CURRENT_DATE_PLACEHOLDER, dateValue);
- }
+ // Resolve the DateTimeFormatter used to render each partition value.
+ // resolvePartitionFormatter normalises both the new
iceberg.partition.value.datetime.format
+ // path and the legacy iceberg.hourly.partition.enabled path into a single
formatter.
+ DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state);
- // Apply lookback period for date partitions
- // lookbackDays=1 (default) means copy only the specified date
- // lookbackDays=3 means copy specified date + 2 previous days (total 3
days)
- int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS,
DEFAULT_LOOKBACK_DAYS);
- List<String> values = Lists.newArrayList();
-
- if (lookbackDays >= 1) {
- log.info("Applying lookback period of {} days for date partition column
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
-
- // Check if hourly partitioning is enabled
- boolean isHourlyPartition =
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED,
DEFAULT_HOURLY_PARTITION_ENABLED);
-
- // Parse the date in yyyy-MM-dd format
- LocalDate start;
+ // Resolve the reference datetime for the filter.
+ // CURRENT_DATE uses LocalDateTime.now() so a formatter pattern that
includes HH will
+ // embed the current clock-hour automatically. For a specific date
(yyyy-MM-dd) the time
+ // defaults to midnight (00:00).
+ LocalDateTime startDateTime;
+ if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+ startDateTime = LocalDateTime.now();
+ log.info("Resolved {} placeholder to current datetime: {}",
CURRENT_DATE_PLACEHOLDER, startDateTime);
+ } else {
try {
- start = LocalDate.parse(dateValue);
+ startDateTime = LocalDate.parse(dateValue).atStartOfDay();
} catch (java.time.format.DateTimeParseException e) {
String errorMsg = String.format(
"Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd.
Error: %s",
ICEBERG_FILTER_DATE, dateValue, e.getMessage());
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg, e);
}
+ }
- for (int i = 0; i < lookbackDays; i++) {
- String dateOnly = start.minusDays(i).toString();
- // Append hour suffix if hourly partitioning is enabled
- String partitionValue = isHourlyPartition ? dateOnly +
HOURLY_PARTITION_SUFFIX : dateOnly;
- values.add(partitionValue);
- log.info("Including partition: {}={}", datePartitionColumn,
partitionValue);
+ // Delegate partition value list + OR expression to
IcebergPartitionFilterGenerator.
+ // When iceberg.lookback.hours > 0 it takes precedence over
iceberg.lookback.days.
+ int lookbackHours = state.getPropAsInt(ICEBERG_LOOKBACK_HOURS,
DEFAULT_LOOKBACK_HOURS);
Review Comment:
Lets have upper bound on `lookbackHours` as well, 48 (2 days) will be a
better bound we dont want user to schedule jobs at each hour with lookback more
than 1 or 2 day
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -244,17 +284,28 @@ public Extractor<String, FileAwareInputStream>
getExtractor(WorkUnitState state)
* (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is
specified separately via
* {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
*
- * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is
appended to all partition values
- * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For
tables using standard daily partition
- * format ({@code yyyy-MM-dd}), set {@code
iceberg.hourly.partition.enabled=false}. Users should always provide
- * dates in standard {@code yyyy-MM-dd} format; the hour suffix is
automatically managed.
+ * <p><b>Partition Value Format:</b> The output partition value used in the
filter expression is controlled by
+ * {@code iceberg.partition.value.format}, a standard {@link
java.time.format.DateTimeFormatter} pattern.
+ * The {@code iceberg.filter.date} input is always provided in {@code
yyyy-MM-dd} form; the format governs
+ * how it is rendered as a partition value. {@code iceberg.partition.hour}
(0-23, default 0) supplies the
+ * hour component when the format includes {@code HH}. Examples:
+ * <ul>
+ * <li>{@code yyyy-MM-dd-HH} with hour=5 → {@code 2025-04-01-05}</li>
+ * <li>{@code dd-MM-yyyy-HH} with hour=0 → {@code 01-04-2025-00}</li>
+ * <li>{@code yyyyMMdd} → {@code 20250401} (daily,
compact)</li>
+ * </ul>
+ * When {@code iceberg.partition.value.format} is set it supersedes {@code
iceberg.hourly.partition.enabled}.
+ * When absent, the legacy {@code iceberg.hourly.partition.enabled} / {@code
iceberg.partition.hour} behaviour
+ * is preserved for backward compatibility (default: appends {@code -00}
suffix).
*
* <p><b>Configuration Examples:</b>
* <ul>
- * <li>Static: {@code iceberg.partition.column=datepartition,
iceberg.filter.date=2025-04-03, iceberg.lookback.days=3}
- * discovers: datepartition=2025-04-03, 2025-04-02, 2025-04-01</li>
+ * <li>Standard daily: {@code iceberg.partition.value.format=yyyy-MM-dd,
iceberg.filter.date=2025-04-03,
+ * iceberg.lookback.days=3} → partitions: {@code 2025-04-03, 2025-04-02,
2025-04-01}</li>
+ * <li>Reversed-date hourly: {@code
iceberg.partition.value.format=dd-MM-yyyy-HH, iceberg.partition.hour=5}
Review Comment:
here as well
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -80,23 +82,39 @@
* iceberg.table.name=table1
* iceberg.catalog.uri=ICEBERG_CATALOG_URI
*
- * # Partition filtering with lookback - Static date (hourly partitions by
default)
+ * # Partition filtering with lookback - Static date
* iceberg.filter.enabled=true
- * iceberg.partition.column=datepartition # Optional, defaults to
"datepartition"
- * iceberg.filter.date=2025-04-01 # Date value (yyyy-MM-dd format)
+ * iceberg.partition.column=datepartition # Optional, defaults to
"datepartition"
+ * iceberg.filter.date=2025-04-01 # Input date always in
yyyy-MM-dd format
Review Comment:
Comment can be uodate to input date in format of pattern specified by
`"iceberg.partition.value.datetime.format";`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]