Copilot commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2903609872


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -286,62 +335,112 @@ private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(Sour
       log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
     }
 
-    // Apply lookback period for date partitions
-    // lookbackDays=1 (default) means copy only the specified date
-    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
-    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
-    List<String> values = Lists.newArrayList();
+    // Resolve hour (0-23) — shared by both daily and hourly lookback paths.
+    int hour = 0;
+    if (state.contains(ICEBERG_PARTITION_HOUR)) {
+      hour = state.getPropAsInt(ICEBERG_PARTITION_HOUR, 0);
+      Preconditions.checkArgument(hour >= 0 && hour <= 23,
+        String.format("iceberg.partition.hour must be between 0 and 23, got: 
%d", hour));
+    }
 
-    if (lookbackDays >= 1) {
-      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+    // Resolve the DateTimeFormatter used to render each partition value.
+    // resolvePartitionFormatter normalises both the new 
iceberg.partition.value.format path
+    // and the legacy iceberg.hourly.partition.enabled path into a single 
formatter.
+    DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state, 
hour);
 
-      // Check if hourly partitioning is enabled
-      boolean isHourlyPartition = 
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, 
DEFAULT_HOURLY_PARTITION_ENABLED);
+    // Parse the input date — always expected in canonical yyyy-MM-dd form.
+    LocalDate startDate;
+    try {
+      startDate = LocalDate.parse(dateValue);
+    } catch (java.time.format.DateTimeParseException e) {
+      String errorMsg = String.format(
+        "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
+        ICEBERG_FILTER_DATE, dateValue, e.getMessage());
+      log.error(errorMsg);
+      throw new IllegalArgumentException(errorMsg, e);
+    }
 
-      // Parse the date in yyyy-MM-dd format
-      LocalDate start;
-      try {
-        start = LocalDate.parse(dateValue);
-      } catch (java.time.format.DateTimeParseException e) {
-        String errorMsg = String.format(
-          "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
-          ICEBERG_FILTER_DATE, dateValue, e.getMessage());
-        log.error(errorMsg);
-        throw new IllegalArgumentException(errorMsg, e);
-      }
+    // Combine resolved date + hour into a single LocalDateTime for the 
generator.
+    LocalDateTime startDateTime = startDate.atTime(hour, 0);
+
+    // Delegate partition value list + OR expression to 
IcebergPartitionFilterGenerator.
+    // When iceberg.lookback.hours > 0 it takes precedence over 
iceberg.lookback.days.
+    int lookbackHours = state.getPropAsInt(ICEBERG_LOOKBACK_HOURS, 
DEFAULT_LOOKBACK_HOURS);
+    int lookbackDays  = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
 
-      for (int i = 0; i < lookbackDays; i++) {
-        String dateOnly = start.minusDays(i).toString();
-        // Append hour suffix if hourly partitioning is enabled
-        String partitionValue = isHourlyPartition ? dateOnly + 
HOURLY_PARTITION_SUFFIX : dateOnly;
-        values.add(partitionValue);
-        log.info("Including partition: {}={}", datePartitionColumn, 
partitionValue);
+    IcebergPartitionFilterGenerator.FilterResult filterResult;
+    if (lookbackHours > 0) {
+      if (state.contains(ICEBERG_LOOKBACK_DAYS)) {
+        log.warn("Both {} ({}) and {} ({}) are set; {} takes precedence",
+          ICEBERG_LOOKBACK_HOURS, lookbackHours, ICEBERG_LOOKBACK_DAYS, 
lookbackDays,
+          ICEBERG_LOOKBACK_HOURS);
       }
+      log.info("Hourly lookback: {} hours for column '{}' starting at {}",
+        lookbackHours, datePartitionColumn, startDateTime);
+      filterResult = IcebergPartitionFilterGenerator.forHours(
+        datePartitionColumn, startDateTime, lookbackHours, partitionFormatter);
     } else {
-      log.error("lookbackDays < 1, cannot apply lookback. lookbackDays={}", 
lookbackDays);
-      throw new IllegalArgumentException(String.format(
-        "lookback.days must be >= 1, got: %d", lookbackDays));
+      Preconditions.checkArgument(lookbackDays >= 1,
+        "iceberg.lookback.days must be >= 1, got: %d", lookbackDays);

Review Comment:
   `Preconditions.checkArgument` uses Guava's `%s`-style placeholders, so the 
`%d` here won't be substituted and the thrown message will be misleading (it 
typically appends the arg in brackets). Use `%s` or wrap with 
`String.format(...)` to include the numeric value correctly in the exception 
message.
   ```suggestion
           "iceberg.lookback.days must be >= 1, got: %s", lookbackDays);
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionFilterGeneratorTest.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import 
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionFilterGenerator.FilterResult;
+
+
+/** Unit tests for {@link IcebergPartitionFilterGenerator}. */
+public class IcebergPartitionFilterGeneratorTest {
+
+  private static final String PARTITION_COL = "datepartition";
+
+  // 
---------------------------------------------------------------------------
+  // forDays — daily stepping
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  public void testForDaysDailyFormat() {
+    // Format without hour field → plain date values
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 3, 0, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forDays(PARTITION_COL, start, 3, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("2025-04-03", "2025-04-02", "2025-04-01"),
+        "Daily format should produce plain date values, most-recent first");
+    Assert.assertNotNull(result.getFilterExpression(), "Filter expression must 
not be null");
+  }
+
+  @Test
+  public void testForDaysHourlyFormat() {
+    // Format includes HH → hour from start is embedded in every value
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 3, 5, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forDays(PARTITION_COL, start, 3, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("2025-04-03-05", "2025-04-02-05", "2025-04-01-05"),
+        "Each day should carry the same hour from start");
+  }
+
+  @Test
+  public void testForDaysReversedDateFormat() {
+    // dd-MM-yyyy-HH — reversed date with hour
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("dd-MM-yyyy-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 3, 0, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forDays(PARTITION_COL, start, 2, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("03-04-2025-00", "02-04-2025-00"),
+        "Reversed-date format should format each day correctly");
+  }
+
+  @Test
+  public void testForDaysCompactFormat() {
+    // yyyyMMdd — compact without any separator
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyyMMdd");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 1, 0, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forDays(PARTITION_COL, start, 1, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(), 
Collections.singletonList("20250401"),
+        "Compact format should produce a single compact date value");
+  }
+
+  @Test
+  public void testForDaysSingleDay() {
+    // lookbackDays=1 means exactly the start date, nothing more
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 1, 0, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forDays(PARTITION_COL, start, 1, fmt);
+
+    Assert.assertEquals(result.getPartitionValues().size(), 1);
+    Assert.assertEquals(result.getPartitionValues().get(0), "2025-04-01");
+  }
+
+  @Test
+  public void testForDaysValuesAreImmutable() {
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    FilterResult result = IcebergPartitionFilterGenerator.forDays(
+        PARTITION_COL, LocalDateTime.of(2025, 4, 1, 0, 0), 2, fmt);
+    try {
+      result.getPartitionValues().add("intruder");
+      Assert.fail("Partition values list should be unmodifiable");
+    } catch (UnsupportedOperationException expected) {
+      // correct
+    }
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = ".*lookbackDays must be >= 1.*")
+  public void testForDaysZeroLookbackThrows() {
+    IcebergPartitionFilterGenerator.forDays(
+        PARTITION_COL, LocalDateTime.now(), 0, 
DateTimeFormatter.ISO_LOCAL_DATE);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testForDaysNegativeLookbackThrows() {
+    IcebergPartitionFilterGenerator.forDays(
+        PARTITION_COL, LocalDateTime.now(), -3, 
DateTimeFormatter.ISO_LOCAL_DATE);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // forHours — hourly stepping
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  public void testForHoursBasic() {
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 1, 14, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forHours(PARTITION_COL, start, 3, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("2025-04-01-14", "2025-04-01-13", "2025-04-01-12"),
+        "Hourly lookback should step back one hour per entry");
+    Assert.assertNotNull(result.getFilterExpression(), "Filter expression must 
not be null");
+  }
+
+  @Test
+  public void testForHoursSingleHour() {
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 1, 5, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forHours(PARTITION_COL, start, 1, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(), 
Collections.singletonList("2025-04-01-05"));
+  }
+
+  @Test
+  public void testForHoursCrossesDateBoundary() {
+    // Starting at 01:00 and looking back 3 hours crosses into the previous day
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 1, 1, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forHours(PARTITION_COL, start, 3, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("2025-04-01-01", "2025-04-01-00", "2025-03-31-23"),
+        "Hourly stepping must cross midnight into the previous day correctly");
+  }
+
+  @Test
+  public void testForHoursCrossesMonthBoundary() {
+    // Starting at 2025-03-01 00:00 and looking back 2 hours crosses into 
February
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 3, 1, 0, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forHours(PARTITION_COL, start, 3, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("2025-03-01-00", "2025-02-28-23", "2025-02-28-22"),
+        "Hourly stepping must cross month boundaries correctly");
+  }
+
+  @Test
+  public void testForHoursReversedDateFormat() {
+    // Verify format-agnosticism for hourly path with reversed date
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("dd-MM-yyyy-HH");
+    LocalDateTime start = LocalDateTime.of(2025, 4, 1, 14, 0);
+
+    FilterResult result = 
IcebergPartitionFilterGenerator.forHours(PARTITION_COL, start, 2, fmt);
+
+    Assert.assertEquals(result.getPartitionValues(),
+        Arrays.asList("01-04-2025-14", "01-04-2025-13"));
+  }
+
+  @Test
+  public void testForHoursValuesAreImmutable() {
+    DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+    FilterResult result = IcebergPartitionFilterGenerator.forHours(
+        PARTITION_COL, LocalDateTime.of(2025, 4, 1, 5, 0), 2, fmt);
+    try {
+      result.getPartitionValues().add("intruder");
+      Assert.fail("Partition values list should be unmodifiable");
+    } catch (UnsupportedOperationException expected) {
+      // correct
+    }
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = ".*lookbackHours must be >= 1.*")
+  public void testForHoursZeroLookbackThrows() {
+    IcebergPartitionFilterGenerator.forHours(
+        PARTITION_COL, LocalDateTime.now(), 0,
+        DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testForHoursNegativeLookbackThrows() {
+    IcebergPartitionFilterGenerator.forHours(
+        PARTITION_COL, LocalDateTime.now(), -5,
+        DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
+  }
+
+  // 
---------------------------------------------------------------------------
+  // buildOrExpression — raw expression builder
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  public void testBuildOrExpressionEmptyListReturnsAlwaysFalse() {
+    Expression expr = IcebergPartitionFilterGenerator.buildOrExpression(
+        PARTITION_COL, Collections.emptyList());
+    // Iceberg's alwaysFalse() is a singleton — compare by class name to stay 
decoupled
+    Assert.assertEquals(expr.getClass().getSimpleName(), "False",

Review Comment:
   The empty-values test currently asserts on `expr.getClass().getSimpleName()` 
and also leaves an unused `Expressions` import, which will fail compilation. 
Prefer asserting identity/equality with `Expressions.alwaysFalse()` (which also 
uses the existing import) so the test is both compile-clean and less brittle 
across Iceberg versions.
   ```suggestion
       // Iceberg's alwaysFalse() is a singleton — compare by identity for 
robustness
       Assert.assertSame(expr, Expressions.alwaysFalse(),
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -286,62 +335,112 @@ private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(Sour
       log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
     }
 
-    // Apply lookback period for date partitions
-    // lookbackDays=1 (default) means copy only the specified date
-    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
-    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
-    List<String> values = Lists.newArrayList();
+    // Resolve hour (0-23) — shared by both daily and hourly lookback paths.
+    int hour = 0;
+    if (state.contains(ICEBERG_PARTITION_HOUR)) {
+      hour = state.getPropAsInt(ICEBERG_PARTITION_HOUR, 0);
+      Preconditions.checkArgument(hour >= 0 && hour <= 23,
+        String.format("iceberg.partition.hour must be between 0 and 23, got: 
%d", hour));
+    }
 
-    if (lookbackDays >= 1) {
-      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+    // Resolve the DateTimeFormatter used to render each partition value.
+    // resolvePartitionFormatter normalises both the new 
iceberg.partition.value.format path
+    // and the legacy iceberg.hourly.partition.enabled path into a single 
formatter.
+    DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state, 
hour);
 
-      // Check if hourly partitioning is enabled
-      boolean isHourlyPartition = 
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, 
DEFAULT_HOURLY_PARTITION_ENABLED);
+    // Parse the input date — always expected in canonical yyyy-MM-dd form.
+    LocalDate startDate;
+    try {
+      startDate = LocalDate.parse(dateValue);
+    } catch (java.time.format.DateTimeParseException e) {
+      String errorMsg = String.format(
+        "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
+        ICEBERG_FILTER_DATE, dateValue, e.getMessage());
+      log.error(errorMsg);
+      throw new IllegalArgumentException(errorMsg, e);
+    }
 
-      // Parse the date in yyyy-MM-dd format
-      LocalDate start;
-      try {
-        start = LocalDate.parse(dateValue);
-      } catch (java.time.format.DateTimeParseException e) {
-        String errorMsg = String.format(
-          "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
-          ICEBERG_FILTER_DATE, dateValue, e.getMessage());
-        log.error(errorMsg);
-        throw new IllegalArgumentException(errorMsg, e);
-      }
+    // Combine resolved date + hour into a single LocalDateTime for the 
generator.
+    LocalDateTime startDateTime = startDate.atTime(hour, 0);
+
+    // Delegate partition value list + OR expression to 
IcebergPartitionFilterGenerator.
+    // When iceberg.lookback.hours > 0 it takes precedence over 
iceberg.lookback.days.
+    int lookbackHours = state.getPropAsInt(ICEBERG_LOOKBACK_HOURS, 
DEFAULT_LOOKBACK_HOURS);
+    int lookbackDays  = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
 

Review Comment:
   `iceberg.lookback.hours` is only checked for `> 0`. If a user sets a 
negative value, it silently falls back to the daily lookback path instead of 
failing fast. Add validation to reject values < 0 (and consider whether 0 
should be treated as invalid when the property is explicitly set).
   ```suggestion
       // When iceberg.lookback.hours > 0 it takes precedence over 
iceberg.lookback.days.
       boolean lookbackHoursConfigured = state.contains(ICEBERG_LOOKBACK_HOURS);
       int lookbackHours = state.getPropAsInt(ICEBERG_LOOKBACK_HOURS, 
DEFAULT_LOOKBACK_HOURS);
       int lookbackDays  = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
   
       if (lookbackHoursConfigured) {
         // When explicitly configured, hourly lookback must be at least 1 hour.
         Preconditions.checkArgument(lookbackHours >= 1,
             "iceberg.lookback.hours must be >= 1 when set, got: %d", 
lookbackHours);
       }
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java:
##########
@@ -1092,6 +1092,597 @@ public void testHourlyPartitionDateFormatWithLookback() 
throws Exception {
     }
   }
 
+  // ---- Tests for iceberg.partition.value.format (generic partition value 
formatting) ----
+
+  @Test
+  public void testPartitionValueFormatDaily() throws Exception {
+    // iceberg.partition.value.format=yyyy-MM-dd should produce plain date 
partitions (no hour)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "2025-04-01"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "2025-04-01",
+      "yyyy-MM-dd format should produce plain date without any hour suffix");
+  }
+
+  @Test
+  public void testPartitionValueFormatHourlyStandard() throws Exception {
+    // iceberg.partition.value.format=yyyy-MM-dd-HH with hour=5 → 
"2025-04-01-05"
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "2025-04-01-05"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "2025-04-01-05",
+      "yyyy-MM-dd-HH format with hour=5 should produce '2025-04-01-05'");
+  }
+
+  @Test
+  public void testPartitionValueFormatReversedDate() throws Exception {
+    // iceberg.partition.value.format=dd-MM-yyyy-HH with default hour → 
"01-04-2025-00"
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"dd-MM-yyyy-HH");
+    // no iceberg.partition.hour set → defaults to hour 0
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "01-04-2025-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "01-04-2025-00",
+      "dd-MM-yyyy-HH format should produce reversed-date partition value");
+  }
+
+  @Test
+  public void testPartitionValueFormatCompact() throws Exception {
+    // iceberg.partition.value.format=yyyyMMdd → "20250401" (no separators, no 
hour)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyyMMdd");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "20250401"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "20250401",
+      "yyyyMMdd format should produce compact date without separators");
+  }
+
+  @Test
+  public void testPartitionValueFormatWithLookback() throws Exception {
+    // iceberg.partition.value.format=dd-MM-yyyy-HH with lookback=3 → 3 
reversed-date partitions
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"dd-MM-yyyy-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "03-04-2025-14"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "02-04-2025-14"), 1000L),
+      new FilePathWithPartition("/data/f3.parquet", 
createPartitionMap("datepartition", "01-04-2025-14"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered = (List<FilePathWithPartition>) 
m.invoke(icebergSource, sourceState, mockTable);
+
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with 
lookback=3");
+
+    String[] dates = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(dates.length, 3);
+    Assert.assertEquals(dates[0], "03-04-2025-14", "Day 0 should be 
reversed-date with hour 14");
+    Assert.assertEquals(dates[1], "02-04-2025-14", "Day 1 should be 
reversed-date with hour 14");
+    Assert.assertEquals(dates[2], "01-04-2025-14", "Day 2 should be 
reversed-date with hour 14");
+  }
+
+  @Test
+  public void testPartitionValueFormatSupersedeLegacyHourlyFlag() throws 
Exception {
+    // When iceberg.partition.value.format is set, 
iceberg.hourly.partition.enabled=false is ignored.
+    // Format "yyyy-MM-dd-HH" must still produce hour suffix regardless of the 
legacy flag.
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"false"); // legacy flag — should be ignored
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "2025-04-01-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "2025-04-01-00",
+      "iceberg.partition.value.format must supersede 
iceberg.hourly.partition.enabled=false");
+  }
+
+  @Test
+  public void testInvalidPartitionValueFormatThrows() throws Exception {
+    // An invalid DateTimeFormatter pattern should throw 
IllegalArgumentException immediately
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"not-a-valid-pattern-Q!!");
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw for invalid DateTimeFormatter pattern");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for invalid format pattern");
+      
Assert.assertTrue(e.getCause().getMessage().contains(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT),
+        "Error message should reference the config key 
iceberg.partition.value.format");
+    }
+  }
+
+  @Test
+  public void testPartitionValueFormatCustomColumnName() throws Exception {
+    // Verify that iceberg.partition.column works correctly with 
iceberg.partition.value.format
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_COLUMN, 
"event_date"); // non-default column
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyyMMdd");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("event_date", "20250401"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_KEY), 
"event_date",
+      "Custom partition column name should be stored in state");
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "20250401",
+      "Compact date format should be applied to custom partition column");
+  }
+
+  // ---- Tests for iceberg.partition.hour (hour-level control) ----
+
+  @Test
+  public void testSpecificHourPartitionSingleDigit() throws Exception {
+    // When iceberg.partition.hour=5, partition value should be yyyy-MM-dd-05 
(zero-padded)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-05"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01-05",
+      "Single-digit hour 5 should be zero-padded to -05 suffix");
+  }
+
+  @Test
+  public void testSpecificHourPartitionDoubleDigit() throws Exception {
+    // When iceberg.partition.hour=14, partition value should be yyyy-MM-dd-14
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-14"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01-14",
+      "Double-digit hour 14 should produce -14 suffix");
+  }
+
+  @Test
+  public void testSpecificHourPartitionWithLookback() throws Exception {
+    // When iceberg.partition.hour=3 and lookback=3, all 3 partition values 
should carry -03 suffix
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "3");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-03-03"), 1000L),
+      new FilePathWithPartition(
+        "/data/file2.parquet", createPartitionMap("datepartition", 
"2025-04-02-03"), 1000L),
+      new FilePathWithPartition(
+        "/data/file3.parquet", createPartitionMap("datepartition", 
"2025-04-01-03"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(filesFor3Days);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with 
lookback=3");
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    String[] dates = partitionValues.split(",");
+    Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+    Assert.assertEquals(dates[0], "2025-04-03-03", "Day 0 should have -03 
suffix");
+    Assert.assertEquals(dates[1], "2025-04-02-03", "Day 1 should have -03 
suffix");
+    Assert.assertEquals(dates[2], "2025-04-01-03", "Day 2 should have -03 
suffix");
+  }
+
+  @Test
+  public void testInvalidHourTooHighThrowsException() throws Exception {
+    // iceberg.partition.hour=25 is out of range (valid: 0-23), should throw 
IllegalArgumentException
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "25");
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw exception for out-of-range hour value 25");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for invalid hour value");
+      
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.partition.hour 
must be between 0 and 23"),
+        "Error message should describe valid hour range");
+    }
+  }
+
+  @Test
+  public void testInvalidHourNegativeThrowsException() throws Exception {
+    // iceberg.partition.hour=-1 is out of range, should throw 
IllegalArgumentException
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "-1");
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw exception for negative hour value -1");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for negative hour value");
+      
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.partition.hour 
must be between 0 and 23"),
+        "Error message should describe valid hour range");
+    }
+  }
+
+  @Test
+  public void testHourlyDisabledIgnoresHourConfig() throws Exception {
+    // When iceberg.hourly.partition.enabled=false, iceberg.partition.hour 
should have no effect
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"false");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5"); // 
should be ignored
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01",
+      "When hourly partition disabled, no hour suffix should be appended 
regardless of iceberg.partition.hour");
+  }
+
+  @Test
+  public void testSpecificHourZeroEquivalentToDefault() throws Exception {
+    // Explicitly setting iceberg.partition.hour=0 should produce the same 
result as the default (-00)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "0");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertEquals(partitionValues, "2025-04-01-00",
+      "Explicitly setting hour=0 should produce the same -00 suffix as the 
default");
+  }
+
+  // ---- Tests for iceberg.lookback.hours (hourly lookback via 
IcebergPartitionFilterGenerator) ----
+
+  @Test
+  public void testLookbackHoursBasic() throws Exception {
+    // iceberg.lookback.hours=3 should produce 3 hourly partition values 
stepping back from start hour
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "2025-04-01-14"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "2025-04-01-13"), 1000L),
+      new FilePathWithPartition("/data/f3.parquet", 
createPartitionMap("datepartition", "2025-04-01-12"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 files for 3 
hourly partitions");
+
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values.length, 3, "Should have 3 partition values");
+    Assert.assertEquals(values[0], "2025-04-01-14", "Hour 0: 14");
+    Assert.assertEquals(values[1], "2025-04-01-13", "Hour 1: 13");
+    Assert.assertEquals(values[2], "2025-04-01-12", "Hour 2: 12");
+  }
+
+  @Test
+  public void testLookbackHoursCrossesDateBoundary() throws Exception {
+    // lookbackHours=3 starting at hour 1 should cross midnight into the 
previous day
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "1");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "2025-04-01-01"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "2025-04-01-00"), 1000L),
+      new FilePathWithPartition("/data/f3.parquet", 
createPartitionMap("datepartition", "2025-03-31-23"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values[0], "2025-04-01-01");
+    Assert.assertEquals(values[1], "2025-04-01-00");
+    Assert.assertEquals(values[2], "2025-03-31-23", "Should cross midnight 
into previous day");
+  }
+
+  @Test
+  public void testLookbackHoursTakesPrecedenceOverLookbackDays() throws 
Exception {
+    // When both lookback.hours and lookback.days are set, hours must win
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "10");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "5"); // 
should be ignored
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "2025-04-01-10"), 500L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "2025-04-01-09"), 500L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    // Should produce exactly 2 hourly values, not 5 daily values
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values.length, 2,
+      "lookback.hours should take precedence, producing 2 values not 5");
+    Assert.assertEquals(values[0], "2025-04-01-10");
+    Assert.assertEquals(values[1], "2025-04-01-09");
+  }
+
+  @Test
+  public void testLookbackHoursWithReversedDateFormat() throws Exception {
+    // Hourly lookback with dd-MM-yyyy-HH format should produce reversed-date 
hourly values
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"dd-MM-yyyy-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "01-04-2025-05"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "01-04-2025-04"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values[0], "01-04-2025-05");
+    Assert.assertEquals(values[1], "01-04-2025-04");
+  }
+
+  @Test
+  public void testLookbackHoursWithCurrentDate() throws Exception {
+    // CURRENT_DATE placeholder works with hourly lookback
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "6");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+    sourceState = new SourceState(new State(properties));
+
+    String today = java.time.LocalDate.now().toString();
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", today + "-06"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", today + "-05"), 1000L)

Review Comment:
   This test can be flaky around midnight because it uses `CURRENT_DATE` 
(resolved via `LocalDate.now()` inside production code) and also captures 
`today` via `LocalDate.now()` in the test. If the date rolls over between those 
calls, assertions will fail. Consider asserting on structure (e.g., values end 
with `-06`/`-05` and share the same date prefix) rather than an exact date, or 
refactor `IcebergSource` to allow injecting a `Clock` for deterministic testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to