stevenzwu commented on code in PR #8553: URL: https://github.com/apache/iceberg/pull/8553#discussion_r1397782440
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -453,6 +492,18 @@ public IcebergSource<T> build() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } + SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter(); + if (watermarkColumn != null) { Review Comment: surface the comment again on avoiding multiple small files in one split. it can increase out of orderliness, as multiple files with different time ranges are merged into one split. read.split.open-file-cost doesn't affect splitting big files. it just avoid bundling multiple smaller files in one split. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; + +public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIcebergSourceFailover { Review Comment: I feel the functional test also covered the general/base part. will leave the decision to you if you think it is better to have a separate base test, ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emitter which emits the watermarks, records and updates the split position. + * + * <p>The Emitter emits watermarks at the beginning of every split provided by the {@link + * SplitWatermarkExtractor}. + */ +class WatermarkExtractorRecordEmitter<T> implements SerializableRecordEmitter<T> { + private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); + private final SplitWatermarkExtractor timeExtractor; + private String lastSplitId = null; + private long watermark; + + WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { + this.timeExtractor = timeExtractor; + } + + @Override + public void emitRecord( + RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) { + if (!split.splitId().equals(lastSplitId)) { + long newWatermark = timeExtractor.extractWatermark(split); + if (newWatermark < watermark) { + LOG.info( + "previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", Review Comment: > we need a summary in the beginning of the log message. "Received a new split with lower watermark. ..." bump this comment ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestColumnStatsWatermarkExtractor { + public static final Schema SCHEMA = + new Schema( + required(1, "timestamp_column", Types.TimestampType.withoutZone()), + required(2, "timestamptz_column", Types.TimestampType.withZone()), + required(3, "long_column", Types.LongType.get()), + required(4, "string_column", Types.StringType.get())); + + private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA); + + private static final List<List<Record>> TEST_RECORDS = + ImmutableList.of( + RandomGenericData.generate(SCHEMA, 3, 2L), RandomGenericData.generate(SCHEMA, 3, 19L)); + + private static final List<Map<String, Long>> MIN_VALUES = + ImmutableList.of(Maps.newHashMapWithExpectedSize(3), Maps.newHashMapWithExpectedSize(3)); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + + private final String columnName; + + @BeforeClass + public static void updateMinValue() { + for (int i = 0; i < TEST_RECORDS.size(); ++i) { + for (Record r : TEST_RECORDS.get(i)) { + Map<String, Long> minValues = MIN_VALUES.get(i); + + LocalDateTime localDateTime = (LocalDateTime) r.get(0); + minValues.merge( + "timestamp_column", localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min); + + OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1); + minValues.merge("timestamptz_column", offsetDateTime.toInstant().toEpochMilli(), Math::min); + + minValues.merge("long_column", (Long) r.get(2), Math::min); + } + } + } + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return ImmutableList.of( + new Object[] {"timestamp_column"}, + new Object[] {"timestamptz_column"}, + new Object[] {"long_column"}); + } + + public TestColumnStatsWatermarkExtractor(String columnName) { + this.columnName = columnName; + } + + @Test + public void testSingle() throws IOException { + ColumnStatsWatermarkExtractor extractor = + new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MILLISECONDS); + + Assert.assertEquals( + MIN_VALUES.get(0).get(columnName).longValue(), extractor.extractWatermark(split(0))); + } + + @Test + public void testTimeUnit() throws IOException { + Assume.assumeTrue("Run only for long column", columnName.equals("long_column")); Review Comment: this is where junit5 can really help due to its flexible parameterization support at method level ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { Review Comment: Here is one idea of A/B testing to test the ordering part. Use allowed lateness to drop late arrival records * create 3 snapshots (one file for each). the 3 files have timestamp like 1, 50, 10 * job parallelism 2, set allowed lateness to 20, make sure 3 splits with target split size * slow down the processing of file 1 and 2 with some sleep. make sure the watermark advanced to 50 before processing file 3. * test A: use sequence number ordered assigner and allowed lateness. the records from file 3 should have been dropped and not in the final output * test B: enable watermark alignment. records from all 3 files should be kept in the final output ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = + ImmutableList.of( + generateRecord(1, "file_3-recordTs_1"), generateRecord(3, "file_3-recordTs_3")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + DataStream<RowData> windowed = + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction<RowData, RowData, TimeWindow>() { + @Override + public void apply( + TimeWindow window, Iterable<RowData> values, Collector<RowData> out) { + // Just print all the data to confirm everything has arrived + values.forEach(out::collect); + } + }); + + try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) { + env.executeAsync("Iceberg Source Windowing Test"); + + // Write data so the windows containing test data are closed + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + + assertRecords(resultIterator, expectedRecords); + } + } + + @Test + public void testThrottling() throws Exception { + GenericAppenderHelper dataAppender = appender(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + List<Record> batch; + batch = + ImmutableList.of( + generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103")); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), Duration.ofMillis(10)), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + + try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) { + JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test"); + + // Check that the read the non-blocked data + // The first RECORD_NUM_FOR_2_SPLITS should be read + // 1 or more from the runaway reader should be arrived depending on thread scheduling + waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); + + // Get the drift metric, wait for it to be created and reach the expected state + // (100 min - 20 min - 0 min) Review Comment: can you help me understand why the drift should reach 80 minutes? file 1 would advance reader 1 watermark to 100. file 2 would advance treader 2 watermark to 0. wouldn't drift be 100? why would it eventually come down to 80 without new splits? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int eventTimeFieldId; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param eventTimeFieldId The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, TimeUnit timeUnit) { + Types.NestedField field = schema.findField(eventTimeFieldId); + TypeID typeID = field.type().typeId(); + Preconditions.checkArgument( + typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP), + "Found %s, expected a LONG or TIMESTAMP column for watermark generation.", + typeID); + this.eventTimeFieldId = field.fieldId(); + // Use the timeUnit only for Long columns. + this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS; + } + + @VisibleForTesting + ColumnStatsWatermarkExtractor(int eventTimeFieldId) { + this.eventTimeFieldId = eventTimeFieldId; + this.timeUnit = TimeUnit.MICROSECONDS; + } + + /** + * Get the watermark for a split using column statistics. + * + * @param split The split + * @return The watermark + * @throws IllegalArgumentException if there is no statistics for the column + */ + @Override + public long extractWatermark(IcebergSourceSplit split) { + return split.task().files().stream() + .map( + scanTask -> { + Preconditions.checkArgument( + scanTask.file().lowerBounds() != null + && scanTask.file().lowerBounds().get(eventTimeFieldId) != null, + "Missing statistics in file = %s for columnId = %s", + scanTask.file(), Review Comment: also wondering if we should save the `eventTimeFieldName` in the constructor and use the name here ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int eventTimeFieldId; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param eventTimeFieldId The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, TimeUnit timeUnit) { + Types.NestedField field = schema.findField(eventTimeFieldId); + TypeID typeID = field.type().typeId(); + Preconditions.checkArgument( + typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP), + "Found %s, expected a LONG or TIMESTAMP column for watermark generation.", + typeID); + this.eventTimeFieldId = field.fieldId(); + // Use the timeUnit only for Long columns. + this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS; + } + + @VisibleForTesting + ColumnStatsWatermarkExtractor(int eventTimeFieldId) { + this.eventTimeFieldId = eventTimeFieldId; + this.timeUnit = TimeUnit.MICROSECONDS; + } + + /** + * Get the watermark for a split using column statistics. + * + * @param split The split + * @return The watermark + * @throws IllegalArgumentException if there is no statistics for the column + */ + @Override + public long extractWatermark(IcebergSourceSplit split) { + return split.task().files().stream() + .map( + scanTask -> { + Preconditions.checkArgument( + scanTask.file().lowerBounds() != null + && scanTask.file().lowerBounds().get(eventTimeFieldId) != null, + "Missing statistics in file = %s for columnId = %s", + scanTask.file(), Review Comment: do we want the whole `DataFile#toString` here? or we just want to include the file path. full dump can be long. it is also better to log the column id (short) before the data file (long text). maybe sth like `Missing statistics for column %s in file %s`? ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); Review Comment: I mean the string value is not follow the style of `file_1-recordTs_100` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int eventTimeFieldId; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param eventTimeFieldId The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, TimeUnit timeUnit) { Review Comment: eventTimeFieldId -> eventTimeFieldName or eventTimeField ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int eventTimeFieldId; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param eventTimeFieldId The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId, TimeUnit timeUnit) { + Types.NestedField field = schema.findField(eventTimeFieldId); + TypeID typeID = field.type().typeId(); + Preconditions.checkArgument( + typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP), + "Found %s, expected a LONG or TIMESTAMP column for watermark generation.", + typeID); + this.eventTimeFieldId = field.fieldId(); + // Use the timeUnit only for Long columns. + this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS; + } + + @VisibleForTesting + ColumnStatsWatermarkExtractor(int eventTimeFieldId) { + this.eventTimeFieldId = eventTimeFieldId; + this.timeUnit = TimeUnit.MICROSECONDS; + } + + /** + * Get the watermark for a split using column statistics. + * + * @param split The split + * @return The watermark + * @throws IllegalArgumentException if there is no statistics for the column + */ + @Override + public long extractWatermark(IcebergSourceSplit split) { + return split.task().files().stream() + .map( + scanTask -> { + Preconditions.checkArgument( + scanTask.file().lowerBounds() != null + && scanTask.file().lowerBounds().get(eventTimeFieldId) != null, + "Missing statistics in file = %s for columnId = %s", + scanTask.file(), + (Object) Integer.valueOf(eventTimeFieldId)); Review Comment: is this necessary? `(Object) Integer.valueOf(...)` ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = + ImmutableList.of( + generateRecord(1, "file_3-recordTs_1"), generateRecord(3, "file_3-recordTs_3")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + DataStream<RowData> windowed = + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction<RowData, RowData, TimeWindow>() { + @Override + public void apply( + TimeWindow window, Iterable<RowData> values, Collector<RowData> out) { + // Just print all the data to confirm everything has arrived + values.forEach(out::collect); + } + }); + + try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) { + env.executeAsync("Iceberg Source Windowing Test"); + + // Write data so the windows containing test data are closed + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + + assertRecords(resultIterator, expectedRecords); + } + } + + @Test + public void testThrottling() throws Exception { + GenericAppenderHelper dataAppender = appender(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + List<Record> batch; + batch = + ImmutableList.of( + generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103")); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), Duration.ofMillis(10)), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + + try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) { + JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test"); + + // Check that the read the non-blocked data + // The first RECORD_NUM_FOR_2_SPLITS should be read + // 1 or more from the runaway reader should be arrived depending on thread scheduling + waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); + + // Get the drift metric, wait for it to be created and reach the expected state + // (100 min - 20 min - 0 min) Review Comment: I am wondering if this can demonstrate throttling/alignment better f1: 1 records with ts = 1 min f2: 50 records with ts = 2 min f3: 10 records with ts = 50 min f4: 1 record with ts = 40 min - setup: parallelism is 2. source discovery interval is 1 ms. watermark update interval is 1 ms. max drift is 20 mins - add a map function that sleep 1 ms for every msg processed to slow down the processing. - append f3 to the table after result collected f1-r1 (indicating f1 finished). - append f4 to the table after result collected f2-r50 (indicating f2 finished). - waiting for f3-r10. some of the f3 records should come after the watermark advanced to minute 40 with f2 finished and f4 arrived. this is an indication of alignment/throttling on f3. without alignment, f3 would have finished before f2. we need to add a util method to wait for a specific record. My concern is if this type of testing can be flaky. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); Review Comment: let's add a comment to explain it. it is not super obvious of the intention of the `% 5`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org