stevenzwu commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1397782440
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
+ SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ if (watermarkColumn != null) {
Review Comment:
surface the comment again on avoiding multiple small files in one split. it
can increase out of orderliness, as multiple files with different time ranges
are merged into one split.
read.split.open-file-cost doesn't affect splitting big files. it just avoid
bundling multiple smaller files in one split.
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends
TestIcebergSourceFailover {
Review Comment:
I feel the functional test also covered the general/base part.
will leave the decision to you if you think it is better to have a separate
base test,
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements
SerializableRecordEmitter<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+ private final SplitWatermarkExtractor timeExtractor;
+ private String lastSplitId = null;
+ private long watermark;
+
+ WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+ this.timeExtractor = timeExtractor;
+ }
+
+ @Override
+ public void emitRecord(
+ RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
+ if (!split.splitId().equals(lastSplitId)) {
+ long newWatermark = timeExtractor.extractWatermark(split);
+ if (newWatermark < watermark) {
+ LOG.info(
+ "previous watermark = {}, current watermark = {}, previous split =
{}, current split = {}",
Review Comment:
> we need a summary in the beginning of the log message. "Received a new
split with lower watermark. ..."
bump this comment
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestColumnStatsWatermarkExtractor {
+ public static final Schema SCHEMA =
+ new Schema(
+ required(1, "timestamp_column", Types.TimestampType.withoutZone()),
+ required(2, "timestamptz_column", Types.TimestampType.withZone()),
+ required(3, "long_column", Types.LongType.get()),
+ required(4, "string_column", Types.StringType.get()));
+
+ private static final GenericAppenderFactory APPENDER_FACTORY = new
GenericAppenderFactory(SCHEMA);
+
+ private static final List<List<Record>> TEST_RECORDS =
+ ImmutableList.of(
+ RandomGenericData.generate(SCHEMA, 3, 2L),
RandomGenericData.generate(SCHEMA, 3, 19L));
+
+ private static final List<Map<String, Long>> MIN_VALUES =
+ ImmutableList.of(Maps.newHashMapWithExpectedSize(3),
Maps.newHashMapWithExpectedSize(3));
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE, SCHEMA);
+
+ private final String columnName;
+
+ @BeforeClass
+ public static void updateMinValue() {
+ for (int i = 0; i < TEST_RECORDS.size(); ++i) {
+ for (Record r : TEST_RECORDS.get(i)) {
+ Map<String, Long> minValues = MIN_VALUES.get(i);
+
+ LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+ minValues.merge(
+ "timestamp_column",
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
+
+ OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+ minValues.merge("timestamptz_column",
offsetDateTime.toInstant().toEpochMilli(), Math::min);
+
+ minValues.merge("long_column", (Long) r.get(2), Math::min);
+ }
+ }
+ }
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return ImmutableList.of(
+ new Object[] {"timestamp_column"},
+ new Object[] {"timestamptz_column"},
+ new Object[] {"long_column"});
+ }
+
+ public TestColumnStatsWatermarkExtractor(String columnName) {
+ this.columnName = columnName;
+ }
+
+ @Test
+ public void testSingle() throws IOException {
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
+ }
+
+ @Test
+ public void testTimeUnit() throws IOException {
+ Assume.assumeTrue("Run only for long column",
columnName.equals("long_column"));
Review Comment:
this is where junit5 can really help due to its flexible parameterization
support at method level
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ @Test
+ public void testWindowing() throws Exception {
Review Comment:
Here is one idea of A/B testing to test the ordering part. Use allowed
lateness to drop late arrival records
* create 3 snapshots (one file for each). the 3 files have timestamp like 1,
50, 10
* job parallelism 2, set allowed lateness to 20, make sure 3 splits with
target split size
* slow down the processing of file 1 and 2 with some sleep. make sure the
watermark advanced to 50 before processing file 3.
* test A: use sequence number ordered assigner and allowed lateness. the
records from file 3 should have been dropped and not in the final output
* test B: enable watermark alignment. records from all 3 files should be
kept in the final output
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ @Test
+ public void testWindowing() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - File 3 - Parallel write for the first records (Watermark 60000)
+ // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3,
"file_3-recordTs_3")
+ List<Record> batch = ImmutableList.of(generateRecord(100, "100"),
generateRecord(103, "103"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+ }
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch =
+ ImmutableList.of(
+ generateRecord(1, "file_3-recordTs_1"), generateRecord(3,
"file_3-recordTs_3"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ sourceBuilder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10))
+
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withTimestampAssigner(new RowDataTimestampAssigner()),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+ DataStream<RowData> windowed =
+ stream
+ .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+ .apply(
+ new AllWindowFunction<RowData, RowData, TimeWindow>() {
+ @Override
+ public void apply(
+ TimeWindow window, Iterable<RowData> values,
Collector<RowData> out) {
+ // Just print all the data to confirm everything has
arrived
+ values.forEach(out::collect);
+ }
+ });
+
+ try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) {
+ env.executeAsync("Iceberg Source Windowing Test");
+
+ // Write data so the windows containing test data are closed
+ dataAppender.appendToTable(ImmutableList.of(generateRecord(1500,
"last-record")));
+ dataAppender.appendToTable(ImmutableList.of(generateRecord(1500,
"last-record")));
+ dataAppender.appendToTable(ImmutableList.of(generateRecord(1500,
"last-record")));
+
+ assertRecords(resultIterator, expectedRecords);
+ }
+ }
+
+ @Test
+ public void testThrottling() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ List<Record> batch;
+ batch =
+ ImmutableList.of(
+ generateRecord(100, "file_1-recordTs_100"), generateRecord(103,
"file_1-recordTs_103"));
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+ }
+
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ sourceBuilder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10))
+
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withWatermarkAlignment("iceberg", Duration.ofMinutes(20),
Duration.ofMillis(10)),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+ try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+ JobClient jobClient = env.executeAsync("Continuous Iceberg Source
Failover Test");
+
+ // Check that the read the non-blocked data
+ // The first RECORD_NUM_FOR_2_SPLITS should be read
+ // 1 or more from the runaway reader should be arrived depending on
thread scheduling
+ waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+ // Get the drift metric, wait for it to be created and reach the
expected state
+ // (100 min - 20 min - 0 min)
Review Comment:
can you help me understand why the drift should reach 80 minutes?
file 1 would advance reader 1 watermark to 100. file 2 would advance treader
2 watermark to 0. wouldn't drift be 100? why would it eventually come down to
80 without new splits?
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor,
Serializable {
+ private final int eventTimeFieldId;
+ private final TimeUnit timeUnit;
+
+ /**
+ * Creates the extractor.
+ *
+ * @param schema The schema of the Table
+ * @param eventTimeFieldId The column which should be used as an event time
+ * @param timeUnit Used for converting the long value to epoch milliseconds
+ */
+ public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId,
TimeUnit timeUnit) {
+ Types.NestedField field = schema.findField(eventTimeFieldId);
+ TypeID typeID = field.type().typeId();
+ Preconditions.checkArgument(
+ typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+ "Found %s, expected a LONG or TIMESTAMP column for watermark
generation.",
+ typeID);
+ this.eventTimeFieldId = field.fieldId();
+ // Use the timeUnit only for Long columns.
+ this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit :
TimeUnit.MICROSECONDS;
+ }
+
+ @VisibleForTesting
+ ColumnStatsWatermarkExtractor(int eventTimeFieldId) {
+ this.eventTimeFieldId = eventTimeFieldId;
+ this.timeUnit = TimeUnit.MICROSECONDS;
+ }
+
+ /**
+ * Get the watermark for a split using column statistics.
+ *
+ * @param split The split
+ * @return The watermark
+ * @throws IllegalArgumentException if there is no statistics for the column
+ */
+ @Override
+ public long extractWatermark(IcebergSourceSplit split) {
+ return split.task().files().stream()
+ .map(
+ scanTask -> {
+ Preconditions.checkArgument(
+ scanTask.file().lowerBounds() != null
+ && scanTask.file().lowerBounds().get(eventTimeFieldId)
!= null,
+ "Missing statistics in file = %s for columnId = %s",
+ scanTask.file(),
Review Comment:
also wondering if we should save the `eventTimeFieldName` in the constructor
and use the name here
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor,
Serializable {
+ private final int eventTimeFieldId;
+ private final TimeUnit timeUnit;
+
+ /**
+ * Creates the extractor.
+ *
+ * @param schema The schema of the Table
+ * @param eventTimeFieldId The column which should be used as an event time
+ * @param timeUnit Used for converting the long value to epoch milliseconds
+ */
+ public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId,
TimeUnit timeUnit) {
+ Types.NestedField field = schema.findField(eventTimeFieldId);
+ TypeID typeID = field.type().typeId();
+ Preconditions.checkArgument(
+ typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+ "Found %s, expected a LONG or TIMESTAMP column for watermark
generation.",
+ typeID);
+ this.eventTimeFieldId = field.fieldId();
+ // Use the timeUnit only for Long columns.
+ this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit :
TimeUnit.MICROSECONDS;
+ }
+
+ @VisibleForTesting
+ ColumnStatsWatermarkExtractor(int eventTimeFieldId) {
+ this.eventTimeFieldId = eventTimeFieldId;
+ this.timeUnit = TimeUnit.MICROSECONDS;
+ }
+
+ /**
+ * Get the watermark for a split using column statistics.
+ *
+ * @param split The split
+ * @return The watermark
+ * @throws IllegalArgumentException if there is no statistics for the column
+ */
+ @Override
+ public long extractWatermark(IcebergSourceSplit split) {
+ return split.task().files().stream()
+ .map(
+ scanTask -> {
+ Preconditions.checkArgument(
+ scanTask.file().lowerBounds() != null
+ && scanTask.file().lowerBounds().get(eventTimeFieldId)
!= null,
+ "Missing statistics in file = %s for columnId = %s",
+ scanTask.file(),
Review Comment:
do we want the whole `DataFile#toString` here? or we just want to include
the file path. full dump can be long. it is also better to log the column id
(short) before the data file (long text).
maybe sth like `Missing statistics for column %s in file %s`?
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ @Test
+ public void testWindowing() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - File 3 - Parallel write for the first records (Watermark 60000)
+ // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3,
"file_3-recordTs_3")
+ List<Record> batch = ImmutableList.of(generateRecord(100, "100"),
generateRecord(103, "103"));
Review Comment:
I mean the string value is not follow the style of `file_1-recordTs_100`
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor,
Serializable {
+ private final int eventTimeFieldId;
+ private final TimeUnit timeUnit;
+
+ /**
+ * Creates the extractor.
+ *
+ * @param schema The schema of the Table
+ * @param eventTimeFieldId The column which should be used as an event time
+ * @param timeUnit Used for converting the long value to epoch milliseconds
+ */
+ public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId,
TimeUnit timeUnit) {
Review Comment:
eventTimeFieldId -> eventTimeFieldName or eventTimeField
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor,
Serializable {
+ private final int eventTimeFieldId;
+ private final TimeUnit timeUnit;
+
+ /**
+ * Creates the extractor.
+ *
+ * @param schema The schema of the Table
+ * @param eventTimeFieldId The column which should be used as an event time
+ * @param timeUnit Used for converting the long value to epoch milliseconds
+ */
+ public ColumnStatsWatermarkExtractor(Schema schema, String eventTimeFieldId,
TimeUnit timeUnit) {
+ Types.NestedField field = schema.findField(eventTimeFieldId);
+ TypeID typeID = field.type().typeId();
+ Preconditions.checkArgument(
+ typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+ "Found %s, expected a LONG or TIMESTAMP column for watermark
generation.",
+ typeID);
+ this.eventTimeFieldId = field.fieldId();
+ // Use the timeUnit only for Long columns.
+ this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit :
TimeUnit.MICROSECONDS;
+ }
+
+ @VisibleForTesting
+ ColumnStatsWatermarkExtractor(int eventTimeFieldId) {
+ this.eventTimeFieldId = eventTimeFieldId;
+ this.timeUnit = TimeUnit.MICROSECONDS;
+ }
+
+ /**
+ * Get the watermark for a split using column statistics.
+ *
+ * @param split The split
+ * @return The watermark
+ * @throws IllegalArgumentException if there is no statistics for the column
+ */
+ @Override
+ public long extractWatermark(IcebergSourceSplit split) {
+ return split.task().files().stream()
+ .map(
+ scanTask -> {
+ Preconditions.checkArgument(
+ scanTask.file().lowerBounds() != null
+ && scanTask.file().lowerBounds().get(eventTimeFieldId)
!= null,
+ "Missing statistics in file = %s for columnId = %s",
+ scanTask.file(),
+ (Object) Integer.valueOf(eventTimeFieldId));
Review Comment:
is this necessary? `(Object) Integer.valueOf(...)`
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ @Test
+ public void testWindowing() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - File 3 - Parallel write for the first records (Watermark 60000)
+ // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3,
"file_3-recordTs_3")
+ List<Record> batch = ImmutableList.of(generateRecord(100, "100"),
generateRecord(103, "103"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+ }
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch =
+ ImmutableList.of(
+ generateRecord(1, "file_3-recordTs_1"), generateRecord(3,
"file_3-recordTs_3"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ sourceBuilder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10))
+
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withTimestampAssigner(new RowDataTimestampAssigner()),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+ DataStream<RowData> windowed =
+ stream
+ .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+ .apply(
+ new AllWindowFunction<RowData, RowData, TimeWindow>() {
+ @Override
+ public void apply(
+ TimeWindow window, Iterable<RowData> values,
Collector<RowData> out) {
+ // Just print all the data to confirm everything has
arrived
+ values.forEach(out::collect);
+ }
+ });
+
+ try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) {
+ env.executeAsync("Iceberg Source Windowing Test");
+
+ // Write data so the windows containing test data are closed
+ dataAppender.appendToTable(ImmutableList.of(generateRecord(1500,
"last-record")));
+ dataAppender.appendToTable(ImmutableList.of(generateRecord(1500,
"last-record")));
+ dataAppender.appendToTable(ImmutableList.of(generateRecord(1500,
"last-record")));
+
+ assertRecords(resultIterator, expectedRecords);
+ }
+ }
+
+ @Test
+ public void testThrottling() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ List<Record> batch;
+ batch =
+ ImmutableList.of(
+ generateRecord(100, "file_1-recordTs_100"), generateRecord(103,
"file_1-recordTs_103"));
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+ }
+
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ sourceBuilder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10))
+
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withWatermarkAlignment("iceberg", Duration.ofMinutes(20),
Duration.ofMillis(10)),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+ try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+ JobClient jobClient = env.executeAsync("Continuous Iceberg Source
Failover Test");
+
+ // Check that the read the non-blocked data
+ // The first RECORD_NUM_FOR_2_SPLITS should be read
+ // 1 or more from the runaway reader should be arrived depending on
thread scheduling
+ waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+ // Get the drift metric, wait for it to be created and reach the
expected state
+ // (100 min - 20 min - 0 min)
Review Comment:
I am wondering if this can demonstrate throttling/alignment better
f1: 1 records with ts = 1 min
f2: 50 records with ts = 2 min
f3: 10 records with ts = 50 min
f4: 1 record with ts = 40 min
- setup: parallelism is 2. source discovery interval is 1 ms. watermark
update interval is 1 ms. max drift is 20 mins
- add a map function that sleep 1 ms for every msg processed to slow down
the processing.
- append f3 to the table after result collected f1-r1 (indicating f1
finished).
- append f4 to the table after result collected f2-r50 (indicating f2
finished).
- waiting for f3-r10. some of the f3 records should come after the watermark
advanced to minute 40 with f2 finished and f4 arrived. this is an indication of
alignment/throttling on f3. without alignment, f3 would have finished before f2.
we need to add a util method to wait for a specific record.
My concern is if this type of testing can be flaky.
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ @Test
+ public void testWindowing() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - File 3 - Parallel write for the first records (Watermark 60000)
+ // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3,
"file_3-recordTs_3")
+ List<Record> batch = ImmutableList.of(generateRecord(100, "100"),
generateRecord(103, "103"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
Review Comment:
let's add a comment to explain it. it is not super obvious of the intention
of the `% 5`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]