stevenzwu commented on code in PR #9346:
URL: https://github.com/apache/iceberg/pull/9346#discussion_r1434494178


##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -489,25 +483,27 @@ public IcebergSource<T> build() {
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
       SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+
+      ScanContext context = contextBuilder.build();
+      String watermarkColumn = context.watermarkColumn();
+      TimeUnit watermarkTimeUnit = context.watermarkTimeUnit();
+
       if (watermarkColumn != null) {
         // Column statistics is needed for watermark generation
-        contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
-
+        context = context.copyWithColumnStat(watermarkColumn);

Review Comment:
   why do we change the model and build the context in line 493 and copy it 
here? it is also related to the removal of line 510 below. trying to understand 
the benefit of this change?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java:
##########
@@ -77,4 +102,88 @@ protected List<Row> run(
     String optionStr = SqlHelpers.sqlOptionsToString(options);
     return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, 
optionStr, sqlFilter);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptions() throws Exception {
+    // Skip AVRO since we don't collect metrics for it, and hence we cannot 
use watermark column as
+    // there are no stats
+    // re: https://github.com/apache/iceberg/pull/1963
+    Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat);
+
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;

Review Comment:
   curious why is baseTime hardcoded? why not `System.currentTimeMillis`?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java:
##########
@@ -77,4 +102,88 @@ protected List<Row> run(
     String optionStr = SqlHelpers.sqlOptionsToString(options);
     return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, 
optionStr, sqlFilter);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptions() throws Exception {
+    // Skip AVRO since we don't collect metrics for it, and hence we cannot 
use watermark column as
+    // there are no stats
+    // re: https://github.com/apache/iceberg/pull/1963
+    Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat);
+
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+
+    // File 1 - early timestamps, new longs
+    Record early1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));

Review Comment:
   why do use different values for t1 and t2? wasn't it more intuitive if they 
are the same?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##########
@@ -146,6 +159,10 @@ public static void assertRows(List<Row> results, List<Row> 
expected) {
     
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
   }
 
+  public static void assertRowsWithOrder(List<Row> results, List<Row> 
expected) {
+    Assertions.assertThat(results).isEqualTo(expected);

Review Comment:
   use `containsExactly`?



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -489,25 +483,27 @@ public IcebergSource<T> build() {
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-

Review Comment:
   nit: unnecessary empty line change



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java:
##########
@@ -77,4 +102,88 @@ protected List<Row> run(
     String optionStr = SqlHelpers.sqlOptionsToString(options);
     return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, 
optionStr, sqlFilter);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptions() throws Exception {
+    // Skip AVRO since we don't collect metrics for it, and hence we cannot 
use watermark column as
+    // there are no stats
+    // re: https://github.com/apache/iceberg/pull/1963
+    Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat);
+
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+
+    // File 1 - early timestamps, new longs
+    Record early1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));
+    Record early2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 
* 60 * 24 * 35L));
+
+    List<Record> recordsDataFile1 = Lists.newArrayList();
+    recordsDataFile1.add(early1);
+    recordsDataFile1.add(early2);
+    DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+    // File 2 - old timestamps, old longs
+    Record late1 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 
* 60 * 24 * 30L));
+    Record late2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 
* 61 * 24 * 35L));
+
+    List<Record> recordsDataFile2 = Lists.newArrayList();
+    recordsDataFile2.add(late1);
+    recordsDataFile2.add(late2);
+    DataFile dataFile2 = helper.writeFile(recordsDataFile2);
+
+    helper.appendToTable(dataFile1, dataFile2);
+
+    // first assertion: early and then late
+    List<Record> expected = Lists.newArrayList();
+    expected.addAll(recordsDataFile1);
+    expected.addAll(recordsDataFile2);
+    TestHelpers.assertRecordsWithOrder(
+        run(
+            SCHEMA_TS,
+            ImmutableList.of(),
+            ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", 
"128000000"),
+            "",
+            "*"),
+        expected,
+        SCHEMA_TS);
+
+    // second assertion: late and then early
+    expected.clear();
+    expected.addAll(recordsDataFile2);
+    expected.addAll(recordsDataFile1);
+    TestHelpers.assertRecordsWithOrder(

Review Comment:
   this should be separated out to another test method.



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java:
##########
@@ -298,6 +313,40 @@ public ScanContext copyWithAppendsBetween(Long 
newStartSnapshotId, long newEndSn
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkTimeUnit(watermarkTimeUnit)
+        .build();
+  }
+
+  public ScanContext copyWithColumnStat(String columnStat) {

Review Comment:
   related to earlier sure. this may not be needed



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -489,25 +483,27 @@ public IcebergSource<T> build() {
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
       SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+

Review Comment:
   nit: empty line probably not needed



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##########
@@ -126,6 +126,19 @@ public static List<Row> convertRowDataToRow(List<RowData> 
rowDataList, RowType r
         .collect(Collectors.toList());
   }
 
+  public static void assertRecordsWithOrder(
+      List<Row> results, List<Record> expectedRecords, Schema schema) {
+    List<Row> expected = Lists.newArrayList();
+    @SuppressWarnings("unchecked")

Review Comment:
   we can extract a method out to avoid duplication with the method below



##########
docs/flink-configuration.md:
##########
@@ -133,7 +132,8 @@ env.getConfig()
 | max-planning-snapshot-count   | 
connector.iceberg.max-planning-snapshot-count   | N/A                          
| Integer.MAX_VALUE                | Max number of snapshots limited per split 
enumeration. Applicable only to streaming read.                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     
                     |
 | limit                         | connector.iceberg.limit                      
   | N/A                          | -1                               | Limited 
output number of rows.                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       
                     |
 | max-allowed-planning-failures | 
connector.iceberg.max-allowed-planning-failures | N/A                          
| 3                                | Max allowed consecutive failures for scan 
planning before failing the job. Set to -1 for never failing the job for scan 
planing failure.                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
       
                     |
-
+| watermark-column              | connector.iceberg.watermark-column           
   | N/A                          | null                             | 
Specifies the watermark column to use for watermark generation. If this option 
is present, the `splitAssignerFactory` will be overridden with 
`OrderedSplitAssignerFactory`.                                                  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                 
                     | 
+| watermark-timeunit            | connector.iceberg.watermark-timeunit         
   | N/A                          | TimeUnit.MICROSECONDS            | 
Specifies the watermark time unit to use for watermark generation. The possible 
values are  DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, 
NANOSECONDS.                                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                        
                     | 

Review Comment:
   this name `watermark-timeunit` can be misinterpreted as the Flink watermark 
time unit. maybe `watermark-column-timeunit` is more clear. 
   
   I know the name comes from the Java API `public Builder<T> 
watermarkTimeUnit(TimeUnit timeUnit) `.  I think we probably made a small 
mistake there. it could have been `watermarkColumnTimeUnit`



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -131,16 +131,20 @@ private DataStream<RowData> 
createDataStream(StreamExecutionEnvironment execEnv)
   private DataStreamSource<RowData> 
createFLIP27Stream(StreamExecutionEnvironment env) {
     SplitAssignerType assignerType =
         readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
-    IcebergSource<RowData> source =
+    IcebergSource.Builder builder =
         IcebergSource.forRowData()
             .tableLoader(loader)
-            .assignerFactory(assignerType.factory())
             .properties(properties)
             .project(getProjectedSchema())
             .limit(limit)
             .filters(filters)
-            .flinkConfig(readableConfig)
-            .build();
+            .flinkConfig(readableConfig);
+
+    if (assignerType != null) {

Review Comment:
   why is this changed? assigner type has a default type. old code should be 
fine, right?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java:
##########
@@ -77,4 +102,88 @@ protected List<Row> run(
     String optionStr = SqlHelpers.sqlOptionsToString(options);
     return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, 
optionStr, sqlFilter);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptions() throws Exception {
+    // Skip AVRO since we don't collect metrics for it, and hence we cannot 
use watermark column as
+    // there are no stats
+    // re: https://github.com/apache/iceberg/pull/1963
+    Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat);

Review Comment:
   this new test is probably better added to TestIcebergSourceSql.
   
   the base class for TestIcebergSourceBoundedSql also test different file 
format, which is not necessary for the advanced options like watermark alignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to