This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e1577e62b Flink: Backport support source watermark for flink sql 
windows (#12697)
2e1577e62b is described below

commit 2e1577e62bf023daddb0ad7aec5ce6f094fb98dc
Author: Swapna Marru <[email protected]>
AuthorDate: Tue Apr 1 15:27:43 2025 -0700

    Flink: Backport support source watermark for flink sql windows (#12697)
    
    backports (#12191)
---
 .../iceberg/flink/source/IcebergTableSource.java   | 16 ++++++-
 .../iceberg/flink/source/TestIcebergSourceSql.java | 54 +++++++++++++++++++++-
 .../apache/iceberg/flink/source/TestSqlBase.java   | 15 ++++++
 .../iceberg/flink/source/IcebergTableSource.java   | 16 ++++++-
 .../iceberg/flink/source/TestIcebergSourceSql.java | 54 +++++++++++++++++++++-
 .../apache/iceberg/flink/source/TestSqlBase.java   | 15 ++++++
 6 files changed, 166 insertions(+), 4 deletions(-)

diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 65adce77d9..662dc30e27 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -35,12 +35,14 @@ import 
org.apache.flink.table.connector.source.ScanTableSource;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class IcebergTableSource
     implements ScanTableSource,
         SupportsProjectionPushDown,
         SupportsFilterPushDown,
-        SupportsLimitPushDown {
+        SupportsLimitPushDown,
+        SupportsSourceWatermark {
 
   private int[] projectedFields;
   private Long limit;
@@ -175,6 +178,17 @@ public class IcebergTableSource
     return Result.of(acceptedFilters, flinkFilters);
   }
 
+  @Override
+  public void applySourceWatermark() {
+    Preconditions.checkArgument(
+        
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
+        "Source watermarks are supported only in flip-27 iceberg source 
implementation");
+
+    Preconditions.checkNotNull(
+        properties.get(FlinkReadOptions.WATERMARK_COLUMN),
+        "watermark-column needs to be configured to use source watermark.");
+  }
+
   @Override
   public boolean supportsNestedProjection() {
     // TODO: support nested projection
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index c8f0b8172d..0cdaf8371c 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.source;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
 import java.time.Instant;
@@ -40,6 +41,7 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
   @BeforeEach
   @Override
   public void before() throws IOException {
-    TableEnvironment tableEnvironment = getTableEnv();
+    setUpTableEnv(getTableEnv());
+    setUpTableEnv(getStreamingTableEnv());
+  }
+
+  private static void setUpTableEnv(TableEnvironment tableEnvironment) {
     Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
     tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
     // Disable inferring parallelism to avoid interfering watermark tests
@@ -72,6 +78,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
     tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, 
true);
   }
 
+  @AfterEach
+  public void after() throws IOException {
+    CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+  }
+
   private Record generateRecord(Instant t1, long t2) {
     Record record = GenericRecord.create(SCHEMA_TS);
     record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
@@ -178,4 +189,45 @@ public class TestIcebergSourceSql extends TestSqlBase {
         expected,
         SCHEMA_TS);
   }
+
+  @Test
+  public void testWatermarkInvalidConfig() {
+    CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
SCHEMA_TS);
+
+    String flinkTable = "`default_catalog`.`default_database`.flink_table";
+    SqlHelpers.sql(
+        getStreamingTableEnv(),
+        "CREATE TABLE %s "
+            + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+            + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE 
iceberg_catalog.`default`.%s",
+        flinkTable,
+        TestFixtures.TABLE);
+
+    assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * 
FROM %s", flinkTable))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("watermark-column needs to be configured to use source 
watermark.");
+  }
+
+  @Test
+  public void testWatermarkValidConfig() throws Exception {
+    List<Record> expected = generateExpectedRecords(true);
+
+    String flinkTable = "`default_catalog`.`default_database`.flink_table";
+
+    SqlHelpers.sql(
+        getStreamingTableEnv(),
+        "CREATE TABLE %s "
+            + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+            + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH 
('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
+        flinkTable,
+        TestFixtures.TABLE);
+
+    TestHelpers.assertRecordsWithOrder(
+        SqlHelpers.sql(
+            getStreamingTableEnv(),
+            "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), 
INTERVAL '1' SECOND))",
+            flinkTable),
+        expected,
+        SCHEMA_TS);
+  }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index f9b776397c..dd63154fe0 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -63,6 +63,8 @@ public abstract class TestSqlBase {
 
   private volatile TableEnvironment tEnv;
 
+  private volatile TableEnvironment streamingTEnv;
+
   protected TableEnvironment getTableEnv() {
     if (tEnv == null) {
       synchronized (this) {
@@ -75,6 +77,19 @@ public abstract class TestSqlBase {
     return tEnv;
   }
 
+  protected TableEnvironment getStreamingTableEnv() {
+    if (streamingTEnv == null) {
+      synchronized (this) {
+        if (streamingTEnv == null) {
+          this.streamingTEnv =
+              
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        }
+      }
+    }
+
+    return streamingTEnv;
+  }
+
   @BeforeEach
   public abstract void before() throws IOException;
 
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 65adce77d9..662dc30e27 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -35,12 +35,14 @@ import 
org.apache.flink.table.connector.source.ScanTableSource;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class IcebergTableSource
     implements ScanTableSource,
         SupportsProjectionPushDown,
         SupportsFilterPushDown,
-        SupportsLimitPushDown {
+        SupportsLimitPushDown,
+        SupportsSourceWatermark {
 
   private int[] projectedFields;
   private Long limit;
@@ -175,6 +178,17 @@ public class IcebergTableSource
     return Result.of(acceptedFilters, flinkFilters);
   }
 
+  @Override
+  public void applySourceWatermark() {
+    Preconditions.checkArgument(
+        
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
+        "Source watermarks are supported only in flip-27 iceberg source 
implementation");
+
+    Preconditions.checkNotNull(
+        properties.get(FlinkReadOptions.WATERMARK_COLUMN),
+        "watermark-column needs to be configured to use source watermark.");
+  }
+
   @Override
   public boolean supportsNestedProjection() {
     // TODO: support nested projection
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index c8f0b8172d..0cdaf8371c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.source;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
 import java.time.Instant;
@@ -40,6 +41,7 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
   @BeforeEach
   @Override
   public void before() throws IOException {
-    TableEnvironment tableEnvironment = getTableEnv();
+    setUpTableEnv(getTableEnv());
+    setUpTableEnv(getStreamingTableEnv());
+  }
+
+  private static void setUpTableEnv(TableEnvironment tableEnvironment) {
     Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
     tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
     // Disable inferring parallelism to avoid interfering watermark tests
@@ -72,6 +78,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
     tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, 
true);
   }
 
+  @AfterEach
+  public void after() throws IOException {
+    CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+  }
+
   private Record generateRecord(Instant t1, long t2) {
     Record record = GenericRecord.create(SCHEMA_TS);
     record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
@@ -178,4 +189,45 @@ public class TestIcebergSourceSql extends TestSqlBase {
         expected,
         SCHEMA_TS);
   }
+
+  @Test
+  public void testWatermarkInvalidConfig() {
+    CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
SCHEMA_TS);
+
+    String flinkTable = "`default_catalog`.`default_database`.flink_table";
+    SqlHelpers.sql(
+        getStreamingTableEnv(),
+        "CREATE TABLE %s "
+            + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+            + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE 
iceberg_catalog.`default`.%s",
+        flinkTable,
+        TestFixtures.TABLE);
+
+    assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * 
FROM %s", flinkTable))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("watermark-column needs to be configured to use source 
watermark.");
+  }
+
+  @Test
+  public void testWatermarkValidConfig() throws Exception {
+    List<Record> expected = generateExpectedRecords(true);
+
+    String flinkTable = "`default_catalog`.`default_database`.flink_table";
+
+    SqlHelpers.sql(
+        getStreamingTableEnv(),
+        "CREATE TABLE %s "
+            + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+            + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH 
('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
+        flinkTable,
+        TestFixtures.TABLE);
+
+    TestHelpers.assertRecordsWithOrder(
+        SqlHelpers.sql(
+            getStreamingTableEnv(),
+            "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), 
INTERVAL '1' SECOND))",
+            flinkTable),
+        expected,
+        SCHEMA_TS);
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index f9b776397c..dd63154fe0 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -63,6 +63,8 @@ public abstract class TestSqlBase {
 
   private volatile TableEnvironment tEnv;
 
+  private volatile TableEnvironment streamingTEnv;
+
   protected TableEnvironment getTableEnv() {
     if (tEnv == null) {
       synchronized (this) {
@@ -75,6 +77,19 @@ public abstract class TestSqlBase {
     return tEnv;
   }
 
+  protected TableEnvironment getStreamingTableEnv() {
+    if (streamingTEnv == null) {
+      synchronized (this) {
+        if (streamingTEnv == null) {
+          this.streamingTEnv =
+              
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        }
+      }
+    }
+
+    return streamingTEnv;
+  }
+
   @BeforeEach
   public abstract void before() throws IOException;
 

Reply via email to