debabhishek53 commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2936965333
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -129,7 +149,25 @@ public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream>
public static final String ICEBERG_FILE_PARTITION_PATH =
"iceberg.file.partition.path";
public static final String ICEBERG_HOURLY_PARTITION_ENABLED =
"iceberg.hourly.partition.enabled";
public static final boolean DEFAULT_HOURLY_PARTITION_ENABLED = true;
- private static final String HOURLY_PARTITION_SUFFIX = "-00";
+ public static final String ICEBERG_PARTITION_HOUR =
"iceberg.partition.hour"; // specific hour (0-23) used with hourly/format-based
partition, defaults to 0
+ /**
+ * Optional {@link DateTimeFormatter} pattern controlling how the partition
value is rendered.
+ * The input date ({@code iceberg.filter.date}) is always supplied in {@code
yyyy-MM-dd} form;
+ * this pattern governs the <em>output</em> string used in the filter
expression.
+ *
+ * <p>Examples:
+ * <ul>
+ * <li>{@code yyyy-MM-dd} → {@code 2025-04-01} (daily, no
hour)</li>
+ * <li>{@code yyyy-MM-dd-HH} → {@code 2025-04-01-05} (hourly, hour
from {@code iceberg.partition.hour})</li>
+ * <li>{@code dd-MM-yyyy-HH} → {@code 01-04-2025-00} (reversed-date
hourly)</li>
+ * <li>{@code yyyyMMdd} → {@code 20250401} (compact
daily)</li>
+ * </ul>
+ *
+ * <p>When this property is set it supersedes {@code
iceberg.hourly.partition.enabled}.
+ * When absent the legacy {@code iceberg.hourly.partition.enabled} / {@code
iceberg.partition.hour}
+ * behaviour is preserved for backward compatibility.
+ */
+ public static final String ICEBERG_PARTITION_VALUE_FORMAT =
"iceberg.partition.value.format";
Review Comment:
Make sense updated the name.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -286,62 +335,112 @@ private List<IcebergTable.FilePathWithPartition>
discoverPartitionFilePaths(Sour
log.info("Resolved {} placeholder to current date: {}",
CURRENT_DATE_PLACEHOLDER, dateValue);
}
- // Apply lookback period for date partitions
- // lookbackDays=1 (default) means copy only the specified date
- // lookbackDays=3 means copy specified date + 2 previous days (total 3
days)
- int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS,
DEFAULT_LOOKBACK_DAYS);
- List<String> values = Lists.newArrayList();
+ // Resolve hour (0-23) — shared by both daily and hourly lookback paths.
+ int hour = 0;
+ if (state.contains(ICEBERG_PARTITION_HOUR)) {
+ hour = state.getPropAsInt(ICEBERG_PARTITION_HOUR, 0);
+ Preconditions.checkArgument(hour >= 0 && hour <= 23,
+ String.format("iceberg.partition.hour must be between 0 and 23, got:
%d", hour));
+ }
+
+ // Resolve the DateTimeFormatter used to render each partition value.
+ // resolvePartitionFormatter normalises both the new
iceberg.partition.value.format path
+ // and the legacy iceberg.hourly.partition.enabled path into a single
formatter.
+ DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state,
hour);
- if (lookbackDays >= 1) {
- log.info("Applying lookback period of {} days for date partition column
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+ // Parse the input date — always expected in canonical yyyy-MM-dd form.
+ LocalDate startDate;
+ try {
+ startDate = LocalDate.parse(dateValue);
+ } catch (java.time.format.DateTimeParseException e) {
Review Comment:
Resolved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]