debabhishek53 commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2904064195
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java:
##########
@@ -1092,6 +1092,597 @@ public void testHourlyPartitionDateFormatWithLookback()
throws Exception {
}
}
+ // ---- Tests for iceberg.partition.value.format (generic partition value
formatting) ----
+
+ @Test
+ public void testPartitionValueFormatDaily() throws Exception {
+ // iceberg.partition.value.format=yyyy-MM-dd should produce plain date
partitions (no hour)
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/file1.parquet",
createPartitionMap("datepartition", "2025-04-01"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"2025-04-01",
+ "yyyy-MM-dd format should produce plain date without any hour suffix");
+ }
+
+ @Test
+ public void testPartitionValueFormatHourlyStandard() throws Exception {
+ // iceberg.partition.value.format=yyyy-MM-dd-HH with hour=5 →
"2025-04-01-05"
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/file1.parquet",
createPartitionMap("datepartition", "2025-04-01-05"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"2025-04-01-05",
+ "yyyy-MM-dd-HH format with hour=5 should produce '2025-04-01-05'");
+ }
+
+ @Test
+ public void testPartitionValueFormatReversedDate() throws Exception {
+ // iceberg.partition.value.format=dd-MM-yyyy-HH with default hour →
"01-04-2025-00"
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"dd-MM-yyyy-HH");
+ // no iceberg.partition.hour set → defaults to hour 0
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/file1.parquet",
createPartitionMap("datepartition", "01-04-2025-00"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"01-04-2025-00",
+ "dd-MM-yyyy-HH format should produce reversed-date partition value");
+ }
+
+ @Test
+ public void testPartitionValueFormatCompact() throws Exception {
+ // iceberg.partition.value.format=yyyyMMdd → "20250401" (no separators, no
hour)
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyyMMdd");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/file1.parquet",
createPartitionMap("datepartition", "20250401"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"20250401",
+ "yyyyMMdd format should produce compact date without separators");
+ }
+
+ @Test
+ public void testPartitionValueFormatWithLookback() throws Exception {
+ // iceberg.partition.value.format=dd-MM-yyyy-HH with lookback=3 → 3
reversed-date partitions
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"dd-MM-yyyy-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/f1.parquet",
createPartitionMap("datepartition", "03-04-2025-14"), 1000L),
+ new FilePathWithPartition("/data/f2.parquet",
createPartitionMap("datepartition", "02-04-2025-14"), 1000L),
+ new FilePathWithPartition("/data/f3.parquet",
createPartitionMap("datepartition", "01-04-2025-14"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered = (List<FilePathWithPartition>)
m.invoke(icebergSource, sourceState, mockTable);
+
+ Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with
lookback=3");
+
+ String[] dates =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+ Assert.assertEquals(dates.length, 3);
+ Assert.assertEquals(dates[0], "03-04-2025-14", "Day 0 should be
reversed-date with hour 14");
+ Assert.assertEquals(dates[1], "02-04-2025-14", "Day 1 should be
reversed-date with hour 14");
+ Assert.assertEquals(dates[2], "01-04-2025-14", "Day 2 should be
reversed-date with hour 14");
+ }
+
+ @Test
+ public void testPartitionValueFormatSupersedeLegacyHourlyFlag() throws
Exception {
+ // When iceberg.partition.value.format is set,
iceberg.hourly.partition.enabled=false is ignored.
+ // Format "yyyy-MM-dd-HH" must still produce hour suffix regardless of the
legacy flag.
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd-HH");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"false"); // legacy flag — should be ignored
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/file1.parquet",
createPartitionMap("datepartition", "2025-04-01-00"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"2025-04-01-00",
+ "iceberg.partition.value.format must supersede
iceberg.hourly.partition.enabled=false");
+ }
+
+ @Test
+ public void testInvalidPartitionValueFormatThrows() throws Exception {
+ // An invalid DateTimeFormatter pattern should throw
IllegalArgumentException immediately
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"not-a-valid-pattern-Q!!");
+ sourceState = new SourceState(new State(properties));
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(icebergSource, sourceState, mockTable);
+ Assert.fail("Should throw for invalid DateTimeFormatter pattern");
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+ "Should throw IllegalArgumentException for invalid format pattern");
+
Assert.assertTrue(e.getCause().getMessage().contains(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT),
+ "Error message should reference the config key
iceberg.partition.value.format");
+ }
+ }
+
+ @Test
+ public void testPartitionValueFormatCustomColumnName() throws Exception {
+ // Verify that iceberg.partition.column works correctly with
iceberg.partition.value.format
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_COLUMN,
"event_date"); // non-default column
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyyMMdd");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/file1.parquet",
createPartitionMap("event_date", "20250401"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_KEY),
"event_date",
+ "Custom partition column name should be stored in state");
+
Assert.assertEquals(sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"20250401",
+ "Compact date format should be applied to custom partition column");
+ }
+
+ // ---- Tests for iceberg.partition.hour (hour-level control) ----
+
+ @Test
+ public void testSpecificHourPartitionSingleDigit() throws Exception {
+ // When iceberg.partition.hour=5, partition value should be yyyy-MM-dd-05
(zero-padded)
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-01-05"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ Assert.assertEquals(partitionValues, "2025-04-01-05",
+ "Single-digit hour 5 should be zero-padded to -05 suffix");
+ }
+
+ @Test
+ public void testSpecificHourPartitionDoubleDigit() throws Exception {
+ // When iceberg.partition.hour=14, partition value should be yyyy-MM-dd-14
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-01-14"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ Assert.assertEquals(partitionValues, "2025-04-01-14",
+ "Double-digit hour 14 should produce -14 suffix");
+ }
+
+ @Test
+ public void testSpecificHourPartitionWithLookback() throws Exception {
+ // When iceberg.partition.hour=3 and lookback=3, all 3 partition values
should carry -03 suffix
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "3");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-03-03"), 1000L),
+ new FilePathWithPartition(
+ "/data/file2.parquet", createPartitionMap("datepartition",
"2025-04-02-03"), 1000L),
+ new FilePathWithPartition(
+ "/data/file3.parquet", createPartitionMap("datepartition",
"2025-04-01-03"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(filesFor3Days);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with
lookback=3");
+
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ String[] dates = partitionValues.split(",");
+ Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+ Assert.assertEquals(dates[0], "2025-04-03-03", "Day 0 should have -03
suffix");
+ Assert.assertEquals(dates[1], "2025-04-02-03", "Day 1 should have -03
suffix");
+ Assert.assertEquals(dates[2], "2025-04-01-03", "Day 2 should have -03
suffix");
+ }
+
+ @Test
+ public void testInvalidHourTooHighThrowsException() throws Exception {
+ // iceberg.partition.hour=25 is out of range (valid: 0-23), should throw
IllegalArgumentException
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "25");
+ sourceState = new SourceState(new State(properties));
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(icebergSource, sourceState, mockTable);
+ Assert.fail("Should throw exception for out-of-range hour value 25");
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+ "Should throw IllegalArgumentException for invalid hour value");
+
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.partition.hour
must be between 0 and 23"),
+ "Error message should describe valid hour range");
+ }
+ }
+
+ @Test
+ public void testInvalidHourNegativeThrowsException() throws Exception {
+ // iceberg.partition.hour=-1 is out of range, should throw
IllegalArgumentException
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "-1");
+ sourceState = new SourceState(new State(properties));
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(icebergSource, sourceState, mockTable);
+ Assert.fail("Should throw exception for negative hour value -1");
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+ "Should throw IllegalArgumentException for negative hour value");
+
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.partition.hour
must be between 0 and 23"),
+ "Error message should describe valid hour range");
+ }
+ }
+
+ @Test
+ public void testHourlyDisabledIgnoresHourConfig() throws Exception {
+ // When iceberg.hourly.partition.enabled=false, iceberg.partition.hour
should have no effect
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"false");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5"); //
should be ignored
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-01"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ Assert.assertEquals(partitionValues, "2025-04-01",
+ "When hourly partition disabled, no hour suffix should be appended
regardless of iceberg.partition.hour");
+ }
+
+ @Test
+ public void testSpecificHourZeroEquivalentToDefault() throws Exception {
+ // Explicitly setting iceberg.partition.hour=0 should produce the same
result as the default (-00)
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "0");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-01-00"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertEquals(partitionValues, "2025-04-01-00",
+ "Explicitly setting hour=0 should produce the same -00 suffix as the
default");
+ }
+
+ // ---- Tests for iceberg.lookback.hours (hourly lookback via
IcebergPartitionFilterGenerator) ----
+
+ @Test
+ public void testLookbackHoursBasic() throws Exception {
+ // iceberg.lookback.hours=3 should produce 3 hourly partition values
stepping back from start hour
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "14");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/f1.parquet",
createPartitionMap("datepartition", "2025-04-01-14"), 1000L),
+ new FilePathWithPartition("/data/f2.parquet",
createPartitionMap("datepartition", "2025-04-01-13"), 1000L),
+ new FilePathWithPartition("/data/f3.parquet",
createPartitionMap("datepartition", "2025-04-01-12"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ Assert.assertEquals(discovered.size(), 3, "Should discover 3 files for 3
hourly partitions");
+
+ String[] values =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+ Assert.assertEquals(values.length, 3, "Should have 3 partition values");
+ Assert.assertEquals(values[0], "2025-04-01-14", "Hour 0: 14");
+ Assert.assertEquals(values[1], "2025-04-01-13", "Hour 1: 13");
+ Assert.assertEquals(values[2], "2025-04-01-12", "Hour 2: 12");
+ }
+
+ @Test
+ public void testLookbackHoursCrossesDateBoundary() throws Exception {
+ // lookbackHours=3 starting at hour 1 should cross midnight into the
previous day
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "1");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/f1.parquet",
createPartitionMap("datepartition", "2025-04-01-01"), 1000L),
+ new FilePathWithPartition("/data/f2.parquet",
createPartitionMap("datepartition", "2025-04-01-00"), 1000L),
+ new FilePathWithPartition("/data/f3.parquet",
createPartitionMap("datepartition", "2025-03-31-23"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ String[] values =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+ Assert.assertEquals(values[0], "2025-04-01-01");
+ Assert.assertEquals(values[1], "2025-04-01-00");
+ Assert.assertEquals(values[2], "2025-03-31-23", "Should cross midnight
into previous day");
+ }
+
+ @Test
+ public void testLookbackHoursTakesPrecedenceOverLookbackDays() throws
Exception {
+ // When both lookback.hours and lookback.days are set, hours must win
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "10");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "5"); //
should be ignored
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/f1.parquet",
createPartitionMap("datepartition", "2025-04-01-10"), 500L),
+ new FilePathWithPartition("/data/f2.parquet",
createPartitionMap("datepartition", "2025-04-01-09"), 500L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ // Should produce exactly 2 hourly values, not 5 daily values
+ String[] values =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+ Assert.assertEquals(values.length, 2,
+ "lookback.hours should take precedence, producing 2 values not 5");
+ Assert.assertEquals(values[0], "2025-04-01-10");
+ Assert.assertEquals(values[1], "2025-04-01-09");
+ }
+
+ @Test
+ public void testLookbackHoursWithReversedDateFormat() throws Exception {
+ // Hourly lookback with dd-MM-yyyy-HH format should produce reversed-date
hourly values
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"dd-MM-yyyy-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "5");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/f1.parquet",
createPartitionMap("datepartition", "01-04-2025-05"), 1000L),
+ new FilePathWithPartition("/data/f2.parquet",
createPartitionMap("datepartition", "01-04-2025-04"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))).thenReturn(files);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ m.invoke(icebergSource, sourceState, mockTable);
+
+ String[] values =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES).split(",");
+ Assert.assertEquals(values[0], "01-04-2025-05");
+ Assert.assertEquals(values[1], "01-04-2025-04");
+ }
+
+ @Test
+ public void testLookbackHoursWithCurrentDate() throws Exception {
+ // CURRENT_DATE placeholder works with hourly lookback
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_FORMAT,
"yyyy-MM-dd-HH");
+ properties.setProperty(IcebergSource.ICEBERG_PARTITION_HOUR, "6");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_HOURS, "2");
+ sourceState = new SourceState(new State(properties));
+
+ String today = java.time.LocalDate.now().toString();
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition("/data/f1.parquet",
createPartitionMap("datepartition", today + "-06"), 1000L),
+ new FilePathWithPartition("/data/f2.parquet",
createPartitionMap("datepartition", today + "-05"), 1000L)
Review Comment:
Updated the code for flakiness
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]