stevenzwu commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1397782440


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
+      SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      if (watermarkColumn != null) {

Review Comment:
   surface the comment again on avoiding multiple small files in one split. it 
can increase out of orderliness, as multiple files with different time ranges 
are merged into one split.
   
   read.split.open-file-cost doesn't affect splitting big files. it just avoid 
bundling multiple smaller files in one split.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends 
TestIcebergSourceFailover {

Review Comment:
   I feel the functional test also covered the general/base part. 
   
   will leave the decision to you if you think it is better to have a separate 
base test, 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by 
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements 
SerializableRecordEmitter<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+  private final SplitWatermarkExtractor timeExtractor;
+  private String lastSplitId = null;
+  private long watermark;
+
+  WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+    this.timeExtractor = timeExtractor;
+  }
+
+  @Override
+  public void emitRecord(
+      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
+    if (!split.splitId().equals(lastSplitId)) {
+      long newWatermark = timeExtractor.extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "previous watermark = {}, current watermark = {}, previous split = 
{}, current split = {}",

Review Comment:
   > we need a summary in the beginning of the log message. "Received a new 
split with lower watermark. ..."
   
   bump this comment



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "timestamp_column", Types.TimestampType.withoutZone()),
+          required(2, "timestamptz_column", Types.TimestampType.withZone()),
+          required(3, "long_column", Types.LongType.get()),
+          required(4, "string_column", Types.StringType.get()));
+
+  private static final GenericAppenderFactory APPENDER_FACTORY = new 
GenericAppenderFactory(SCHEMA);
+
+  private static final List<List<Record>> TEST_RECORDS =
+      ImmutableList.of(
+          RandomGenericData.generate(SCHEMA, 3, 2L), 
RandomGenericData.generate(SCHEMA, 3, 19L));
+
+  private static final List<Map<String, Long>> MIN_VALUES =
+      ImmutableList.of(Maps.newHashMapWithExpectedSize(3), 
Maps.newHashMapWithExpectedSize(3));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private final String columnName;
+
+  @BeforeClass
+  public static void updateMinValue() {
+    for (int i = 0; i < TEST_RECORDS.size(); ++i) {
+      for (Record r : TEST_RECORDS.get(i)) {
+        Map<String, Long> minValues = MIN_VALUES.get(i);
+
+        LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+        minValues.merge(
+            "timestamp_column", 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
+
+        OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+        minValues.merge("timestamptz_column", 
offsetDateTime.toInstant().toEpochMilli(), Math::min);
+
+        minValues.merge("long_column", (Long) r.get(2), Math::min);
+      }
+    }
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> data() {
+    return ImmutableList.of(
+        new Object[] {"timestamp_column"},
+        new Object[] {"timestamptz_column"},
+        new Object[] {"long_column"});
+  }
+
+  public TestColumnStatsWatermarkExtractor(String columnName) {
+    this.columnName = columnName;
+  }
+
+  @Test
+  public void testSingle() throws IOException {
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MILLISECONDS);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
+  }
+
+  @Test
+  public void testTimeUnit() throws IOException {
+    Assume.assumeTrue("Run only for long column", 
columnName.equals("long_column"));

Review Comment:
   this is where junit5 can really help due to its flexible parameterization 
support at method level



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {

Review Comment:
   Here is one idea of A/B testing to test the ordering part. Use allowed 
lateness to drop late arrival records
   
   * create 3 snapshots (one file for each). the 3 files have timestamp like 1, 
50, 10
   * job parallelism 2, set allowed lateness to 20, make sure 3 splits with 
target split size
   * slow down the processing of file 1 and 2 with some sleep. make sure the 
watermark advanced to 50 before processing file 3.
   * test A: use sequence number ordered assigner and allowed lateness. the 
records from file 3 should have been dropped and not in the final output
   * test B: enable watermark alignment. records from all 3 files should be 
kept in the final output



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch =
+        ImmutableList.of(
+            generateRecord(1, "file_3-recordTs_1"), generateRecord(3, 
"file_3-recordTs_3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) {
+      env.executeAsync("Iceberg Source Windowing Test");
+
+      // Write data so the windows containing test data are closed
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+      assertRecords(resultIterator, expectedRecords);
+    }
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    List<Record> batch;
+    batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"), generateRecord(103, 
"file_1-recordTs_103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), 
Duration.ofMillis(10)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+      JobClient jobClient = env.executeAsync("Continuous Iceberg Source 
Failover Test");
+
+      // Check that the read the non-blocked data
+      // The first RECORD_NUM_FOR_2_SPLITS should be read
+      // 1 or more from the runaway reader should be arrived depending on 
thread scheduling
+      waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state
+      // (100 min - 20 min - 0 min)

Review Comment:
   can you help me understand why the drift should reach 80 minutes?
   
   file 1 would advance reader 1 watermark to 100. file 2 would advance treader 
2 watermark to 0. wouldn't drift be 100? why would it eventually come down to 
80 without new splits?
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int eventTimeFieldId;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param eventTimeFieldId The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, 
TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(eventTimeFieldId);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.eventTimeFieldId = field.fieldId();
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;
+  }
+
+  @VisibleForTesting
+  ColumnStatsWatermarkExtractor(int eventTimeFieldId) {
+    this.eventTimeFieldId = eventTimeFieldId;
+    this.timeUnit = TimeUnit.MICROSECONDS;
+  }
+
+  /**
+   * Get the watermark for a split using column statistics.
+   *
+   * @param split The split
+   * @return The watermark
+   * @throws IllegalArgumentException if there is no statistics for the column
+   */
+  @Override
+  public long extractWatermark(IcebergSourceSplit split) {
+    return split.task().files().stream()
+        .map(
+            scanTask -> {
+              Preconditions.checkArgument(
+                  scanTask.file().lowerBounds() != null
+                      && scanTask.file().lowerBounds().get(eventTimeFieldId) 
!= null,
+                  "Missing statistics in file = %s for columnId = %s",
+                  scanTask.file(),

Review Comment:
   also wondering if we should save the `eventTimeFieldName` in the constructor 
and use the name here



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int eventTimeFieldId;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param eventTimeFieldId The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, 
TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(eventTimeFieldId);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.eventTimeFieldId = field.fieldId();
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;
+  }
+
+  @VisibleForTesting
+  ColumnStatsWatermarkExtractor(int eventTimeFieldId) {
+    this.eventTimeFieldId = eventTimeFieldId;
+    this.timeUnit = TimeUnit.MICROSECONDS;
+  }
+
+  /**
+   * Get the watermark for a split using column statistics.
+   *
+   * @param split The split
+   * @return The watermark
+   * @throws IllegalArgumentException if there is no statistics for the column
+   */
+  @Override
+  public long extractWatermark(IcebergSourceSplit split) {
+    return split.task().files().stream()
+        .map(
+            scanTask -> {
+              Preconditions.checkArgument(
+                  scanTask.file().lowerBounds() != null
+                      && scanTask.file().lowerBounds().get(eventTimeFieldId) 
!= null,
+                  "Missing statistics in file = %s for columnId = %s",
+                  scanTask.file(),

Review Comment:
   do we want the whole `DataFile#toString` here? or we just want to include 
the file path. full dump can be long. it is also better to log the column id 
(short) before the data file (long text).
   
   maybe sth like `Missing statistics for column %s in file %s`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));

Review Comment:
   I mean the string value is not follow the style of `file_1-recordTs_100`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int eventTimeFieldId;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param eventTimeFieldId The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, 
TimeUnit timeUnit) {

Review Comment:
   eventTimeFieldId -> eventTimeFieldName or eventTimeField



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int eventTimeFieldId;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param eventTimeFieldId The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, 
TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(eventTimeFieldId);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.eventTimeFieldId = field.fieldId();
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;
+  }
+
+  @VisibleForTesting
+  ColumnStatsWatermarkExtractor(int eventTimeFieldId) {
+    this.eventTimeFieldId = eventTimeFieldId;
+    this.timeUnit = TimeUnit.MICROSECONDS;
+  }
+
+  /**
+   * Get the watermark for a split using column statistics.
+   *
+   * @param split The split
+   * @return The watermark
+   * @throws IllegalArgumentException if there is no statistics for the column
+   */
+  @Override
+  public long extractWatermark(IcebergSourceSplit split) {
+    return split.task().files().stream()
+        .map(
+            scanTask -> {
+              Preconditions.checkArgument(
+                  scanTask.file().lowerBounds() != null
+                      && scanTask.file().lowerBounds().get(eventTimeFieldId) 
!= null,
+                  "Missing statistics in file = %s for columnId = %s",
+                  scanTask.file(),
+                  (Object) Integer.valueOf(eventTimeFieldId));

Review Comment:
   is this necessary? `(Object) Integer.valueOf(...)`



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch =
+        ImmutableList.of(
+            generateRecord(1, "file_3-recordTs_1"), generateRecord(3, 
"file_3-recordTs_3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) {
+      env.executeAsync("Iceberg Source Windowing Test");
+
+      // Write data so the windows containing test data are closed
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+      assertRecords(resultIterator, expectedRecords);
+    }
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    List<Record> batch;
+    batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"), generateRecord(103, 
"file_1-recordTs_103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), 
Duration.ofMillis(10)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+      JobClient jobClient = env.executeAsync("Continuous Iceberg Source 
Failover Test");
+
+      // Check that the read the non-blocked data
+      // The first RECORD_NUM_FOR_2_SPLITS should be read
+      // 1 or more from the runaway reader should be arrived depending on 
thread scheduling
+      waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state
+      // (100 min - 20 min - 0 min)

Review Comment:
   I am wondering if this can demonstrate throttling/alignment better
   f1: 1 records with ts = 1 min
   f2: 50 records with ts = 2 min
   f3: 10 records with ts = 50 min
   f4: 1 record with ts = 40 min
   
   - setup: parallelism is 2. source discovery interval is 1 ms. watermark 
update interval is 1 ms. max drift is 20 mins
   - add a map function that sleep 1 ms for every msg processed to slow down 
the processing.
   - append f3 to the table after result collected f1-r1 (indicating f1 
finished).
   - append f4 to the table after result collected f2-r50 (indicating f2 
finished).
   - waiting for f3-r10. some of the f3 records should come after the watermark 
advanced to minute 40 with f2 finished and f4 arrived. this is an indication of 
alignment/throttling on f3. without alignment, f3 would have finished before f2.
   
   we need to add a util method to wait for a specific record.
   
   My concern is if this type of testing can be flaky.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));

Review Comment:
   let's add a comment to explain it. it is not super obvious of the intention 
of the `% 5`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to