This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8109e420e6 Backporting Flink: Watermark Read Options to 1.17 and 1.16 
(#9456)
8109e420e6 is described below

commit 8109e420e6d563a73e17ff23c321f1a48a2b976d
Author: Rodrigo <[email protected]>
AuthorDate: Fri Jan 12 22:15:55 2024 -0800

    Backporting Flink: Watermark Read Options to 1.17 and 1.16 (#9456)
---
 .../org/apache/iceberg/flink/FlinkReadConf.java    |  19 +++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |  11 ++
 .../apache/iceberg/flink/source/IcebergSource.java |  19 ++-
 .../apache/iceberg/flink/source/ScanContext.java   |  38 +++++-
 .../java/org/apache/iceberg/flink/TestHelpers.java |  17 ++-
 .../iceberg/flink/source/TestIcebergSourceSql.java | 130 +++++++++++++++++++--
 .../org/apache/iceberg/flink/FlinkReadConf.java    |  19 +++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |  11 ++
 .../apache/iceberg/flink/source/IcebergSource.java |  19 ++-
 .../apache/iceberg/flink/source/ScanContext.java   |  38 +++++-
 .../java/org/apache/iceberg/flink/TestHelpers.java |  18 ++-
 .../iceberg/flink/source/TestIcebergSourceSql.java | 130 +++++++++++++++++++--
 12 files changed, 431 insertions(+), 38 deletions(-)

diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index 0e04c9affb..d53ea73f93 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import java.time.Duration;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.TimeUtils;
 import org.apache.iceberg.Table;
@@ -190,4 +191,22 @@ public class FlinkReadConf {
         
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
         .parse();
   }
+
+  public String watermarkColumn() {
+    return confParser
+        .stringConf()
+        .option(FlinkReadOptions.WATERMARK_COLUMN)
+        .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
+        .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
+        .parseOptional();
+  }
+
+  public TimeUnit watermarkColumnTimeUnit() {
+    return confParser
+        .enumConfParser(TimeUnit.class)
+        .option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
+        .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
+        
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
+        .parse();
+  }
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index 55c5aca3b6..1bbd88146c 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.iceberg.TableProperties;
@@ -109,4 +110,14 @@ public class FlinkReadOptions {
   public static final String MAX_ALLOWED_PLANNING_FAILURES = 
"max-allowed-planning-failures";
   public static final ConfigOption<Integer> 
MAX_ALLOWED_PLANNING_FAILURES_OPTION =
       ConfigOptions.key(PREFIX + 
MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
+
+  public static final String WATERMARK_COLUMN = "watermark-column";
+  public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
+      ConfigOptions.key(PREFIX + 
WATERMARK_COLUMN).stringType().noDefaultValue();
+
+  public static final String WATERMARK_COLUMN_TIME_UNIT = 
"watermark-column-time-unit";
+  public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION 
=
+      ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
+          .enumType(TimeUnit.class)
+          .defaultValue(TimeUnit.MICROSECONDS);
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index a7ce2db61f..0655cf87a9 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadConf;
 import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
@@ -219,8 +220,6 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
     private SerializableComparator<IcebergSourceSplit> splitComparator;
-    private String watermarkColumn;
-    private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
     private ReaderFunction<T> readerFunction;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -242,9 +241,6 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     }
 
     public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
-      Preconditions.checkArgument(
-          watermarkColumn == null,
-          "Watermark column and SplitAssigner should not be set in the same 
source");
       this.splitAssignerFactory = assignerFactory;
       return this;
     }
@@ -441,7 +437,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
      * Emits watermarks once per split based on the min value of column 
statistics from files
      * metadata in the given split. The generated watermarks are also used for 
ordering the splits
      * for read. Accepted column types are timestamp/timestamptz/long. For 
long columns consider
-     * setting {@link #watermarkTimeUnit(TimeUnit)}.
+     * setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
      *
      * <p>Consider setting `read.split.open-file-cost` to prevent combining 
small files to a single
      * split when the watermark is used for watermark alignment.
@@ -450,7 +446,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       Preconditions.checkArgument(
           splitAssignerFactory == null,
           "Watermark column and SplitAssigner should not be set in the same 
source");
-      this.watermarkColumn = columnName;
+      readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
       return this;
     }
 
@@ -459,8 +455,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
      * org.apache.iceberg.types.Types.LongType}, then sets the {@link 
TimeUnit} to convert the
      * value. The default value is {@link TimeUnit#MICROSECONDS}.
      */
-    public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
-      this.watermarkTimeUnit = timeUnit;
+    public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
+      readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, 
timeUnit.name());
       return this;
     }
 
@@ -482,13 +478,16 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
       SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, 
flinkConfig);
+      String watermarkColumn = flinkReadConf.watermarkColumn();
+      TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
+
       if (watermarkColumn != null) {
         // Column statistics is needed for watermark generation
         contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 4357b1f57d..3dce5dd590 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.Preconditions;
@@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
   private final Integer planParallelism;
   private final int maxPlanningSnapshotCount;
   private final int maxAllowedPlanningFailures;
+  private final String watermarkColumn;
+  private final TimeUnit watermarkColumnTimeUnit;
 
   private ScanContext(
       boolean caseSensitive,
@@ -91,6 +94,8 @@ public class ScanContext implements Serializable {
       Integer planParallelism,
       int maxPlanningSnapshotCount,
       int maxAllowedPlanningFailures,
+      String watermarkColumn,
+      TimeUnit watermarkColumnTimeUnit,
       String branch,
       String tag,
       String startTag,
@@ -122,6 +127,8 @@ public class ScanContext implements Serializable {
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
     this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
+    this.watermarkColumn = watermarkColumn;
+    this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;
 
     validate();
   }
@@ -272,6 +279,14 @@ public class ScanContext implements Serializable {
     return maxAllowedPlanningFailures;
   }
 
+  public String watermarkColumn() {
+    return watermarkColumn;
+  }
+
+  public TimeUnit watermarkColumnTimeUnit() {
+    return watermarkColumnTimeUnit;
+  }
+
   public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long 
newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -298,6 +313,8 @@ public class ScanContext implements Serializable {
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
         .build();
   }
 
@@ -327,6 +344,8 @@ public class ScanContext implements Serializable {
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
         .build();
   }
 
@@ -367,6 +386,9 @@ public class ScanContext implements Serializable {
         FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
     private int maxAllowedPlanningFailures =
         FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
+    private String watermarkColumn = 
FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
+    private TimeUnit watermarkColumnTimeUnit =
+        FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
 
     private Builder() {}
 
@@ -500,6 +522,16 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder watermarkColumn(String newWatermarkColumn) {
+      this.watermarkColumn = newWatermarkColumn;
+      return this;
+    }
+
+    public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
+      this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
+      return this;
+    }
+
     public Builder resolveConfig(
         Table table, Map<String, String> readOptions, ReadableConfig 
readableConfig) {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, 
readableConfig);
@@ -525,7 +557,9 @@ public class ScanContext implements Serializable {
           .planParallelism(flinkReadConf.workerPoolSize())
           .includeColumnStats(flinkReadConf.includeColumnStats())
           .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
-          .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
+          .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+          .watermarkColumn(flinkReadConf.watermarkColumn())
+          .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
     }
 
     public ScanContext build() {
@@ -552,6 +586,8 @@ public class ScanContext implements Serializable {
           planParallelism,
           maxPlanningSnapshotCount,
           maxAllowedPlanningFailures,
+          watermarkColumn,
+          watermarkColumnTimeUnit,
           branch,
           tag,
           startTag,
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 3e574b841c..b33f181dac 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -126,7 +126,7 @@ public class TestHelpers {
         .collect(Collectors.toList());
   }
 
-  public static void assertRecords(List<Row> results, List<Record> 
expectedRecords, Schema schema) {
+  private static List<Row> convertRecordToRow(List<Record> expectedRecords, 
Schema schema) {
     List<Row> expected = Lists.newArrayList();
     @SuppressWarnings("unchecked")
     DataStructureConverter<RowData, Row> converter =
@@ -135,6 +135,17 @@ public class TestHelpers {
                 
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
     expectedRecords.forEach(
         r -> 
expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
+    return expected;
+  }
+
+  public static void assertRecordsWithOrder(
+      List<Row> results, List<Record> expectedRecords, Schema schema) {
+    List<Row> expected = convertRecordToRow(expectedRecords, schema);
+    assertRowsWithOrder(results, expected);
+  }
+
+  public static void assertRecords(List<Row> results, List<Record> 
expectedRecords, Schema schema) {
+    List<Row> expected = convertRecordToRow(expectedRecords, schema);
     assertRows(results, expected);
   }
 
@@ -146,6 +157,10 @@ public class TestHelpers {
     
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
   }
 
+  public static void assertRowsWithOrder(List<Row> results, List<Row> 
expected) {
+    Assertions.assertThat(results).containsExactlyElementsOf(expected);
+  }
+
   public static void assertRowData(Schema schema, StructLike expected, RowData 
actual) {
     assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), 
expected, actual);
   }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index e66ae79c28..4250460d27 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -18,25 +18,141 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
 
 /** Use the IcebergSource (FLIP-27) */
 public class TestIcebergSourceSql extends TestSqlBase {
+  private static final Schema SCHEMA_TS =
+      new Schema(
+          required(1, "t1", Types.TimestampType.withoutZone()),
+          required(2, "t2", Types.LongType.get()));
+
   @Override
   public void before() throws IOException {
-    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    TableEnvironment tableEnvironment = getTableEnv();
+    Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
     
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+
+    
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", 
"1");
     SqlHelpers.sql(
-        getTableEnv(),
+        tableEnvironment,
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
         catalogResource.warehouse());
-    SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
-    getTableEnv()
-        .getConfig()
-        .getConfiguration()
-        .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+    SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog");
+
+    tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, 
true);
+  }
+
+  private Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /** Generates the records in the expected order, with respect to their 
datafile */
+  private List<Record> generateExpectedRecords(boolean ascending) throws 
Exception {
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    Record file1Record1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));
+    Record file1Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 
* 60 * 24 * 35L));
+
+    List<Record> recordsDataFile1 = Lists.newArrayList();
+    recordsDataFile1.add(file1Record1);
+    recordsDataFile1.add(file1Record2);
+    DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+
+    Record file2Record1 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 
* 60 * 24 * 30L));
+    Record file2Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 
* 61 * 24 * 35L));
+
+    List<Record> recordsDataFile2 = Lists.newArrayList();
+    recordsDataFile2.add(file2Record1);
+    recordsDataFile2.add(file2Record2);
+
+    DataFile dataFile2 = helper.writeFile(recordsDataFile2);
+    helper.appendToTable(dataFile1, dataFile2);
+
+    // Expected records if the splits are ordered
+    //     - ascending (watermark from t1) - records from the split with early 
timestamps, then
+    // records from the split with late timestamps
+    //     - descending (watermark from t2) - records from the split with old 
longs, then records
+    // from the split with new longs
+    List<Record> expected = Lists.newArrayList();
+    if (ascending) {
+      expected.addAll(recordsDataFile1);
+      expected.addAll(recordsDataFile2);
+    } else {
+      expected.addAll(recordsDataFile2);
+      expected.addAll(recordsDataFile1);
+    }
+    return expected;
+  }
+
+  /** Tests the order of splits returned when setting the watermark-column 
options */
+  @Test
+  public void testWatermarkOptionsAscending() throws Exception {
+    List<Record> expected = generateExpectedRecords(true);
+    TestHelpers.assertRecordsWithOrder(
+        run(
+            ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", 
"128000000"),
+            "",
+            "*"),
+        expected,
+        SCHEMA_TS);
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and
+   * watermark-column-time-unit" options
+   */
+  @Test
+  public void testWatermarkOptionsDescending() throws Exception {
+    List<Record> expected = generateExpectedRecords(false);
+    TestHelpers.assertRecordsWithOrder(
+        run(
+            ImmutableMap.of(
+                "watermark-column",
+                "t2",
+                "watermark-column-time-unit",
+                "MILLISECONDS",
+                "split-file-open-cost",
+                "128000000"),
+            "",
+            "*"),
+        expected,
+        SCHEMA_TS);
   }
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index 0e04c9affb..d53ea73f93 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import java.time.Duration;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.TimeUtils;
 import org.apache.iceberg.Table;
@@ -190,4 +191,22 @@ public class FlinkReadConf {
         
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
         .parse();
   }
+
+  public String watermarkColumn() {
+    return confParser
+        .stringConf()
+        .option(FlinkReadOptions.WATERMARK_COLUMN)
+        .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
+        .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
+        .parseOptional();
+  }
+
+  public TimeUnit watermarkColumnTimeUnit() {
+    return confParser
+        .enumConfParser(TimeUnit.class)
+        .option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
+        .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
+        
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
+        .parse();
+  }
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index 55c5aca3b6..1bbd88146c 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.iceberg.TableProperties;
@@ -109,4 +110,14 @@ public class FlinkReadOptions {
   public static final String MAX_ALLOWED_PLANNING_FAILURES = 
"max-allowed-planning-failures";
   public static final ConfigOption<Integer> 
MAX_ALLOWED_PLANNING_FAILURES_OPTION =
       ConfigOptions.key(PREFIX + 
MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
+
+  public static final String WATERMARK_COLUMN = "watermark-column";
+  public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
+      ConfigOptions.key(PREFIX + 
WATERMARK_COLUMN).stringType().noDefaultValue();
+
+  public static final String WATERMARK_COLUMN_TIME_UNIT = 
"watermark-column-time-unit";
+  public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION 
=
+      ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
+          .enumType(TimeUnit.class)
+          .defaultValue(TimeUnit.MICROSECONDS);
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index a7ce2db61f..0655cf87a9 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadConf;
 import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
@@ -219,8 +220,6 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
     private SerializableComparator<IcebergSourceSplit> splitComparator;
-    private String watermarkColumn;
-    private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
     private ReaderFunction<T> readerFunction;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -242,9 +241,6 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     }
 
     public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
-      Preconditions.checkArgument(
-          watermarkColumn == null,
-          "Watermark column and SplitAssigner should not be set in the same 
source");
       this.splitAssignerFactory = assignerFactory;
       return this;
     }
@@ -441,7 +437,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
      * Emits watermarks once per split based on the min value of column 
statistics from files
      * metadata in the given split. The generated watermarks are also used for 
ordering the splits
      * for read. Accepted column types are timestamp/timestamptz/long. For 
long columns consider
-     * setting {@link #watermarkTimeUnit(TimeUnit)}.
+     * setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
      *
      * <p>Consider setting `read.split.open-file-cost` to prevent combining 
small files to a single
      * split when the watermark is used for watermark alignment.
@@ -450,7 +446,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       Preconditions.checkArgument(
           splitAssignerFactory == null,
           "Watermark column and SplitAssigner should not be set in the same 
source");
-      this.watermarkColumn = columnName;
+      readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
       return this;
     }
 
@@ -459,8 +455,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
      * org.apache.iceberg.types.Types.LongType}, then sets the {@link 
TimeUnit} to convert the
      * value. The default value is {@link TimeUnit#MICROSECONDS}.
      */
-    public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
-      this.watermarkTimeUnit = timeUnit;
+    public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
+      readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, 
timeUnit.name());
       return this;
     }
 
@@ -482,13 +478,16 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
       SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, 
flinkConfig);
+      String watermarkColumn = flinkReadConf.watermarkColumn();
+      TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
+
       if (watermarkColumn != null) {
         // Column statistics is needed for watermark generation
         contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 4357b1f57d..3dce5dd590 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.Preconditions;
@@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
   private final Integer planParallelism;
   private final int maxPlanningSnapshotCount;
   private final int maxAllowedPlanningFailures;
+  private final String watermarkColumn;
+  private final TimeUnit watermarkColumnTimeUnit;
 
   private ScanContext(
       boolean caseSensitive,
@@ -91,6 +94,8 @@ public class ScanContext implements Serializable {
       Integer planParallelism,
       int maxPlanningSnapshotCount,
       int maxAllowedPlanningFailures,
+      String watermarkColumn,
+      TimeUnit watermarkColumnTimeUnit,
       String branch,
       String tag,
       String startTag,
@@ -122,6 +127,8 @@ public class ScanContext implements Serializable {
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
     this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
+    this.watermarkColumn = watermarkColumn;
+    this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;
 
     validate();
   }
@@ -272,6 +279,14 @@ public class ScanContext implements Serializable {
     return maxAllowedPlanningFailures;
   }
 
+  public String watermarkColumn() {
+    return watermarkColumn;
+  }
+
+  public TimeUnit watermarkColumnTimeUnit() {
+    return watermarkColumnTimeUnit;
+  }
+
   public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long 
newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -298,6 +313,8 @@ public class ScanContext implements Serializable {
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
         .build();
   }
 
@@ -327,6 +344,8 @@ public class ScanContext implements Serializable {
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
         .build();
   }
 
@@ -367,6 +386,9 @@ public class ScanContext implements Serializable {
         FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
     private int maxAllowedPlanningFailures =
         FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
+    private String watermarkColumn = 
FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
+    private TimeUnit watermarkColumnTimeUnit =
+        FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
 
     private Builder() {}
 
@@ -500,6 +522,16 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder watermarkColumn(String newWatermarkColumn) {
+      this.watermarkColumn = newWatermarkColumn;
+      return this;
+    }
+
+    public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
+      this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
+      return this;
+    }
+
     public Builder resolveConfig(
         Table table, Map<String, String> readOptions, ReadableConfig 
readableConfig) {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, 
readableConfig);
@@ -525,7 +557,9 @@ public class ScanContext implements Serializable {
           .planParallelism(flinkReadConf.workerPoolSize())
           .includeColumnStats(flinkReadConf.includeColumnStats())
           .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
-          .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
+          .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+          .watermarkColumn(flinkReadConf.watermarkColumn())
+          .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
     }
 
     public ScanContext build() {
@@ -552,6 +586,8 @@ public class ScanContext implements Serializable {
           planParallelism,
           maxPlanningSnapshotCount,
           maxAllowedPlanningFailures,
+          watermarkColumn,
+          watermarkColumnTimeUnit,
           branch,
           tag,
           startTag,
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index d2d0c97f4b..80e5ddd24f 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -68,6 +68,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
+import org.assertj.core.api.Assertions;
 
 public class TestHelpers {
   private TestHelpers() {}
@@ -126,7 +127,7 @@ public class TestHelpers {
         .collect(Collectors.toList());
   }
 
-  public static void assertRecords(List<Row> results, List<Record> 
expectedRecords, Schema schema) {
+  private static List<Row> convertRecordToRow(List<Record> expectedRecords, 
Schema schema) {
     List<Row> expected = Lists.newArrayList();
     @SuppressWarnings("unchecked")
     DataStructureConverter<RowData, Row> converter =
@@ -135,6 +136,17 @@ public class TestHelpers {
                 
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
     expectedRecords.forEach(
         r -> 
expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
+    return expected;
+  }
+
+  public static void assertRecordsWithOrder(
+      List<Row> results, List<Record> expectedRecords, Schema schema) {
+    List<Row> expected = convertRecordToRow(expectedRecords, schema);
+    assertRowsWithOrder(results, expected);
+  }
+
+  public static void assertRecords(List<Row> results, List<Record> 
expectedRecords, Schema schema) {
+    List<Row> expected = convertRecordToRow(expectedRecords, schema);
     assertRows(results, expected);
   }
 
@@ -146,6 +158,10 @@ public class TestHelpers {
     assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
   }
 
+  public static void assertRowsWithOrder(List<Row> results, List<Row> 
expected) {
+    Assertions.assertThat(results).containsExactlyElementsOf(expected);
+  }
+
   public static void assertRowData(Schema schema, StructLike expected, RowData 
actual) {
     assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), 
expected, actual);
   }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index e66ae79c28..4250460d27 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -18,25 +18,141 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
 
 /** Use the IcebergSource (FLIP-27) */
 public class TestIcebergSourceSql extends TestSqlBase {
+  private static final Schema SCHEMA_TS =
+      new Schema(
+          required(1, "t1", Types.TimestampType.withoutZone()),
+          required(2, "t2", Types.LongType.get()));
+
   @Override
   public void before() throws IOException {
-    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    TableEnvironment tableEnvironment = getTableEnv();
+    Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
     
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+
+    
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", 
"1");
     SqlHelpers.sql(
-        getTableEnv(),
+        tableEnvironment,
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
         catalogResource.warehouse());
-    SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
-    getTableEnv()
-        .getConfig()
-        .getConfiguration()
-        .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+    SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog");
+
+    tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, 
true);
+  }
+
+  private Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /** Generates the records in the expected order, with respect to their 
datafile */
+  private List<Record> generateExpectedRecords(boolean ascending) throws 
Exception {
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    Record file1Record1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));
+    Record file1Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 
* 60 * 24 * 35L));
+
+    List<Record> recordsDataFile1 = Lists.newArrayList();
+    recordsDataFile1.add(file1Record1);
+    recordsDataFile1.add(file1Record2);
+    DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+
+    Record file2Record1 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 
* 60 * 24 * 30L));
+    Record file2Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 
* 61 * 24 * 35L));
+
+    List<Record> recordsDataFile2 = Lists.newArrayList();
+    recordsDataFile2.add(file2Record1);
+    recordsDataFile2.add(file2Record2);
+
+    DataFile dataFile2 = helper.writeFile(recordsDataFile2);
+    helper.appendToTable(dataFile1, dataFile2);
+
+    // Expected records if the splits are ordered
+    //     - ascending (watermark from t1) - records from the split with early 
timestamps, then
+    // records from the split with late timestamps
+    //     - descending (watermark from t2) - records from the split with old 
longs, then records
+    // from the split with new longs
+    List<Record> expected = Lists.newArrayList();
+    if (ascending) {
+      expected.addAll(recordsDataFile1);
+      expected.addAll(recordsDataFile2);
+    } else {
+      expected.addAll(recordsDataFile2);
+      expected.addAll(recordsDataFile1);
+    }
+    return expected;
+  }
+
+  /** Tests the order of splits returned when setting the watermark-column 
options */
+  @Test
+  public void testWatermarkOptionsAscending() throws Exception {
+    List<Record> expected = generateExpectedRecords(true);
+    TestHelpers.assertRecordsWithOrder(
+        run(
+            ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", 
"128000000"),
+            "",
+            "*"),
+        expected,
+        SCHEMA_TS);
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and
+   * watermark-column-time-unit" options
+   */
+  @Test
+  public void testWatermarkOptionsDescending() throws Exception {
+    List<Record> expected = generateExpectedRecords(false);
+    TestHelpers.assertRecordsWithOrder(
+        run(
+            ImmutableMap.of(
+                "watermark-column",
+                "t2",
+                "watermark-column-time-unit",
+                "MILLISECONDS",
+                "split-file-open-cost",
+                "128000000"),
+            "",
+            "*"),
+        expected,
+        SCHEMA_TS);
   }
 }


Reply via email to