debabhishek53 commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2904064195


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java:
##########
@@ -1092,6 +1092,597 @@ public void testHourlyPartitionDateFormatWithLookback() 
throws Exception {
     }
   }
 
+  // ---- Tests for iceberg.partition.value.format (generic partition value 
formatting) ----
+
+  @Test
+  public void testPartitionValueFormatDaily() throws Exception {
+    // iceberg.partition.value.format=yyyy-MM-dd should produce plain date 
partitions (no hour)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "2025-04-01"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "2025-04-01",
+      "yyyy-MM-dd format should produce plain date without any hour suffix");
+  }
+
+  @Test
+  public void testPartitionValueFormatHourlyStandard() throws Exception {
+    // iceberg.partition.value.format=yyyy-MM-dd-HH with hour=5 → 
"2025-04-01-05"
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "2025-04-01-05"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "2025-04-01-05",
+      "yyyy-MM-dd-HH format with hour=5 should produce '2025-04-01-05'");
+  }
+
+  @Test
+  public void testPartitionValueFormatReversedDate() throws Exception {
+    // iceberg.partition.value.format=dd-MM-yyyy-HH with default hour → 
"01-04-2025-00"
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"dd-MM-yyyy-HH");
+    // no iceberg.partition.hour set → defaults to hour 0
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "01-04-2025-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "01-04-2025-00",
+      "dd-MM-yyyy-HH format should produce reversed-date partition value");
+  }
+
+  @Test
+  public void testPartitionValueFormatCompact() throws Exception {
+    // iceberg.partition.value.format=yyyyMMdd → "20250401" (no separators, no 
hour)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyyMMdd");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "20250401"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "20250401",
+      "yyyyMMdd format should produce compact date without separators");
+  }
+
+  @Test
+  public void testPartitionValueFormatWithLookback() throws Exception {
+    // iceberg.partition.value.format=dd-MM-yyyy-HH with lookback=3 → 3 
reversed-date partitions
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"dd-MM-yyyy-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "03-04-2025-14"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "02-04-2025-14"), 1000L),
+      new FilePathWithPartition("/data/f3.parquet", 
createPartitionMap("datepartition", "01-04-2025-14"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered = (List<FilePathWithPartition>) 
m.invoke(icebergSource, sourceState, mockTable);
+
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with 
lookback=3");
+
+    String[] dates = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(dates.length, 3);
+    Assert.assertEquals(dates[0], "03-04-2025-14", "Day 0 should be 
reversed-date with hour 14");
+    Assert.assertEquals(dates[1], "02-04-2025-14", "Day 1 should be 
reversed-date with hour 14");
+    Assert.assertEquals(dates[2], "01-04-2025-14", "Day 2 should be 
reversed-date with hour 14");
+  }
+
+  @Test
+  public void testPartitionValueFormatSupersedeLegacyHourlyFlag() throws 
Exception {
+    // When iceberg.partition.value.format is set, 
iceberg.hourly.partition.enabled=false is ignored.
+    // Format "yyyy-MM-dd-HH" must still produce hour suffix regardless of the 
legacy flag.
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"false"); // legacy flag — should be ignored
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("datepartition", "2025-04-01-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "2025-04-01-00",
+      "iceberg.partition.value.format must supersede 
iceberg.hourly.partition.enabled=false");
+  }
+
+  @Test
+  public void testInvalidPartitionValueFormatThrows() throws Exception {
+    // An invalid DateTimeFormatter pattern should throw 
IllegalArgumentException immediately
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"not-a-valid-pattern-Q!!");
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw for invalid DateTimeFormatter pattern");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for invalid format pattern");
+      
Assert.assertTrue(e.getCause().getMessage().contains(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT),
+        "Error message should reference the config key 
iceberg.partition.value.format");
+    }
+  }
+
+  @Test
+  public void testPartitionValueFormatCustomColumnName() throws Exception {
+    // Verify that iceberg.partition.column works correctly with 
iceberg.partition.value.format
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_COLUMN, 
"event_date"); // non-default column
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyyMMdd");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/file1.parquet", 
createPartitionMap("event_date", "20250401"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_KEY), 
"event_date",
+      "Custom partition column name should be stored in state");
+    
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
 "20250401",
+      "Compact date format should be applied to custom partition column");
+  }
+
+  // ---- Tests for iceberg.partition.hour (hour-level control) ----
+
+  @Test
+  public void testSpecificHourPartitionSingleDigit() throws Exception {
+    // When iceberg.partition.hour=5, partition value should be yyyy-MM-dd-05 
(zero-padded)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-05"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01-05",
+      "Single-digit hour 5 should be zero-padded to -05 suffix");
+  }
+
+  @Test
+  public void testSpecificHourPartitionDoubleDigit() throws Exception {
+    // When iceberg.partition.hour=14, partition value should be yyyy-MM-dd-14
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-14"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01-14",
+      "Double-digit hour 14 should produce -14 suffix");
+  }
+
+  @Test
+  public void testSpecificHourPartitionWithLookback() throws Exception {
+    // When iceberg.partition.hour=3 and lookback=3, all 3 partition values 
should carry -03 suffix
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "3");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-03-03"), 1000L),
+      new FilePathWithPartition(
+        "/data/file2.parquet", createPartitionMap("datepartition", 
"2025-04-02-03"), 1000L),
+      new FilePathWithPartition(
+        "/data/file3.parquet", createPartitionMap("datepartition", 
"2025-04-01-03"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(filesFor3Days);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with 
lookback=3");
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    String[] dates = partitionValues.split(",");
+    Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+    Assert.assertEquals(dates[0], "2025-04-03-03", "Day 0 should have -03 
suffix");
+    Assert.assertEquals(dates[1], "2025-04-02-03", "Day 1 should have -03 
suffix");
+    Assert.assertEquals(dates[2], "2025-04-01-03", "Day 2 should have -03 
suffix");
+  }
+
+  @Test
+  public void testInvalidHourTooHighThrowsException() throws Exception {
+    // iceberg.partition.hour=25 is out of range (valid: 0-23), should throw 
IllegalArgumentException
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "25");
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw exception for out-of-range hour value 25");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for invalid hour value");
+      
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.partition.hour 
must be between 0 and 23"),
+        "Error message should describe valid hour range");
+    }
+  }
+
+  @Test
+  public void testInvalidHourNegativeThrowsException() throws Exception {
+    // iceberg.partition.hour=-1 is out of range, should throw 
IllegalArgumentException
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "-1");
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw exception for negative hour value -1");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for negative hour value");
+      
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.partition.hour 
must be between 0 and 23"),
+        "Error message should describe valid hour range");
+    }
+  }
+
+  @Test
+  public void testHourlyDisabledIgnoresHourConfig() throws Exception {
+    // When iceberg.hourly.partition.enabled=false, iceberg.partition.hour 
should have no effect
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"false");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5"); // 
should be ignored
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01",
+      "When hourly partition disabled, no hour suffix should be appended 
regardless of iceberg.partition.hour");
+  }
+
+  @Test
+  public void testSpecificHourZeroEquivalentToDefault() throws Exception {
+    // Explicitly setting iceberg.partition.hour=0 should produce the same 
result as the default (-00)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "0");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertEquals(partitionValues, "2025-04-01-00",
+      "Explicitly setting hour=0 should produce the same -00 suffix as the 
default");
+  }
+
+  // ---- Tests for iceberg.lookback.hours (hourly lookback via 
IcebergPartitionFilterGenerator) ----
+
+  @Test
+  public void testLookbackHoursBasic() throws Exception {
+    // iceberg.lookback.hours=3 should produce 3 hourly partition values 
stepping back from start hour
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "2025-04-01-14"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "2025-04-01-13"), 1000L),
+      new FilePathWithPartition("/data/f3.parquet", 
createPartitionMap("datepartition", "2025-04-01-12"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 files for 3 
hourly partitions");
+
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values.length, 3, "Should have 3 partition values");
+    Assert.assertEquals(values[0], "2025-04-01-14", "Hour 0: 14");
+    Assert.assertEquals(values[1], "2025-04-01-13", "Hour 1: 13");
+    Assert.assertEquals(values[2], "2025-04-01-12", "Hour 2: 12");
+  }
+
+  @Test
+  public void testLookbackHoursCrossesDateBoundary() throws Exception {
+    // lookbackHours=3 starting at hour 1 should cross midnight into the 
previous day
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "1");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "2025-04-01-01"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "2025-04-01-00"), 1000L),
+      new FilePathWithPartition("/data/f3.parquet", 
createPartitionMap("datepartition", "2025-03-31-23"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values[0], "2025-04-01-01");
+    Assert.assertEquals(values[1], "2025-04-01-00");
+    Assert.assertEquals(values[2], "2025-03-31-23", "Should cross midnight 
into previous day");
+  }
+
+  @Test
+  public void testLookbackHoursTakesPrecedenceOverLookbackDays() throws 
Exception {
+    // When both lookback.hours and lookback.days are set, hours must win
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "10");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "5"); // 
should be ignored
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "2025-04-01-10"), 500L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "2025-04-01-09"), 500L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    // Should produce exactly 2 hourly values, not 5 daily values
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values.length, 2,
+      "lookback.hours should take precedence, producing 2 values not 5");
+    Assert.assertEquals(values[0], "2025-04-01-10");
+    Assert.assertEquals(values[1], "2025-04-01-09");
+  }
+
+  @Test
+  public void testLookbackHoursWithReversedDateFormat() throws Exception {
+    // Hourly lookback with dd-MM-yyyy-HH format should produce reversed-date 
hourly values
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"dd-MM-yyyy-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", "01-04-2025-05"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", "01-04-2025-04"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    m.invoke(icebergSource, sourceState, mockTable);
+
+    String[] values = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+    Assert.assertEquals(values[0], "01-04-2025-05");
+    Assert.assertEquals(values[1], "01-04-2025-04");
+  }
+
+  @Test
+  public void testLookbackHoursWithCurrentDate() throws Exception {
+    // CURRENT_DATE placeholder works with hourly lookback
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT, 
"yyyy-MM-dd-HH");
+    properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "6");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+    sourceState = new SourceState(new State(properties));
+
+    String today = java.time.LocalDate.now().toString();
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition("/data/f1.parquet", 
createPartitionMap("datepartition", today + "-06"), 1000L),
+      new FilePathWithPartition("/data/f2.parquet", 
createPartitionMap("datepartition", today + "-05"), 1000L)

Review Comment:
   Updated the code for flakiness 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to