This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8109e420e6 Backporting Flink: Watermark Read Options to 1.17 and 1.16
(#9456)
8109e420e6 is described below
commit 8109e420e6d563a73e17ff23c321f1a48a2b976d
Author: Rodrigo <[email protected]>
AuthorDate: Fri Jan 12 22:15:55 2024 -0800
Backporting Flink: Watermark Read Options to 1.17 and 1.16 (#9456)
---
.../org/apache/iceberg/flink/FlinkReadConf.java | 19 +++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 11 ++
.../apache/iceberg/flink/source/IcebergSource.java | 19 ++-
.../apache/iceberg/flink/source/ScanContext.java | 38 +++++-
.../java/org/apache/iceberg/flink/TestHelpers.java | 17 ++-
.../iceberg/flink/source/TestIcebergSourceSql.java | 130 +++++++++++++++++++--
.../org/apache/iceberg/flink/FlinkReadConf.java | 19 +++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 11 ++
.../apache/iceberg/flink/source/IcebergSource.java | 19 ++-
.../apache/iceberg/flink/source/ScanContext.java | 38 +++++-
.../java/org/apache/iceberg/flink/TestHelpers.java | 18 ++-
.../iceberg/flink/source/TestIcebergSourceSql.java | 130 +++++++++++++++++++--
12 files changed, 431 insertions(+), 38 deletions(-)
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index 0e04c9affb..d53ea73f93 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import java.time.Duration;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
@@ -190,4 +191,22 @@ public class FlinkReadConf {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}
+
+ public String watermarkColumn() {
+ return confParser
+ .stringConf()
+ .option(FlinkReadOptions.WATERMARK_COLUMN)
+ .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
+ .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
+ .parseOptional();
+ }
+
+ public TimeUnit watermarkColumnTimeUnit() {
+ return confParser
+ .enumConfParser(TimeUnit.class)
+ .option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
+ .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
+
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
+ .parse();
+ }
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index 55c5aca3b6..1bbd88146c 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
@@ -109,4 +110,14 @@ public class FlinkReadOptions {
public static final String MAX_ALLOWED_PLANNING_FAILURES =
"max-allowed-planning-failures";
public static final ConfigOption<Integer>
MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX +
MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
+
+ public static final String WATERMARK_COLUMN = "watermark-column";
+ public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
+ ConfigOptions.key(PREFIX +
WATERMARK_COLUMN).stringType().noDefaultValue();
+
+ public static final String WATERMARK_COLUMN_TIME_UNIT =
"watermark-column-time-unit";
+ public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION
=
+ ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
+ .enumType(TimeUnit.class)
+ .defaultValue(TimeUnit.MICROSECONDS);
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index a7ce2db61f..0655cf87a9 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -219,8 +220,6 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
- private String watermarkColumn;
- private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -242,9 +241,6 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
- Preconditions.checkArgument(
- watermarkColumn == null,
- "Watermark column and SplitAssigner should not be set in the same
source");
this.splitAssignerFactory = assignerFactory;
return this;
}
@@ -441,7 +437,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
* Emits watermarks once per split based on the min value of column
statistics from files
* metadata in the given split. The generated watermarks are also used for
ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For
long columns consider
- * setting {@link #watermarkTimeUnit(TimeUnit)}.
+ * setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining
small files to a single
* split when the watermark is used for watermark alignment.
@@ -450,7 +446,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same
source");
- this.watermarkColumn = columnName;
+ readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}
@@ -459,8 +455,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
* org.apache.iceberg.types.Types.LongType}, then sets the {@link
TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
- public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
- this.watermarkTimeUnit = timeUnit;
+ public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
+ readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT,
timeUnit.name());
return this;
}
@@ -482,13 +478,16 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions,
flinkConfig);
+ String watermarkColumn = flinkReadConf.watermarkColumn();
+ TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
+
if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 4357b1f57d..3dce5dd590 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
@@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
+ private final String watermarkColumn;
+ private final TimeUnit watermarkColumnTimeUnit;
private ScanContext(
boolean caseSensitive,
@@ -91,6 +94,8 @@ public class ScanContext implements Serializable {
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
+ String watermarkColumn,
+ TimeUnit watermarkColumnTimeUnit,
String branch,
String tag,
String startTag,
@@ -122,6 +127,8 @@ public class ScanContext implements Serializable {
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
+ this.watermarkColumn = watermarkColumn;
+ this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;
validate();
}
@@ -272,6 +279,14 @@ public class ScanContext implements Serializable {
return maxAllowedPlanningFailures;
}
+ public String watermarkColumn() {
+ return watermarkColumn;
+ }
+
+ public TimeUnit watermarkColumnTimeUnit() {
+ return watermarkColumnTimeUnit;
+ }
+
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long
newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -298,6 +313,8 @@ public class ScanContext implements Serializable {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+ .watermarkColumn(watermarkColumn)
+ .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}
@@ -327,6 +344,8 @@ public class ScanContext implements Serializable {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+ .watermarkColumn(watermarkColumn)
+ .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}
@@ -367,6 +386,9 @@ public class ScanContext implements Serializable {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
+ private String watermarkColumn =
FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
+ private TimeUnit watermarkColumnTimeUnit =
+ FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
private Builder() {}
@@ -500,6 +522,16 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder watermarkColumn(String newWatermarkColumn) {
+ this.watermarkColumn = newWatermarkColumn;
+ return this;
+ }
+
+ public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
+ this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
+ return this;
+ }
+
public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig
readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions,
readableConfig);
@@ -525,7 +557,9 @@ public class ScanContext implements Serializable {
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
- .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+ .watermarkColumn(flinkReadConf.watermarkColumn())
+ .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
}
public ScanContext build() {
@@ -552,6 +586,8 @@ public class ScanContext implements Serializable {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
+ watermarkColumn,
+ watermarkColumnTimeUnit,
branch,
tag,
startTag,
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 3e574b841c..b33f181dac 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -126,7 +126,7 @@ public class TestHelpers {
.collect(Collectors.toList());
}
- public static void assertRecords(List<Row> results, List<Record>
expectedRecords, Schema schema) {
+ private static List<Row> convertRecordToRow(List<Record> expectedRecords,
Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
@@ -135,6 +135,17 @@ public class TestHelpers {
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r ->
expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
+ return expected;
+ }
+
+ public static void assertRecordsWithOrder(
+ List<Row> results, List<Record> expectedRecords, Schema schema) {
+ List<Row> expected = convertRecordToRow(expectedRecords, schema);
+ assertRowsWithOrder(results, expected);
+ }
+
+ public static void assertRecords(List<Row> results, List<Record>
expectedRecords, Schema schema) {
+ List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRows(results, expected);
}
@@ -146,6 +157,10 @@ public class TestHelpers {
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}
+ public static void assertRowsWithOrder(List<Row> results, List<Row>
expected) {
+ Assertions.assertThat(results).containsExactlyElementsOf(expected);
+ }
+
public static void assertRowData(Schema schema, StructLike expected, RowData
actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema),
expected, actual);
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index e66ae79c28..4250460d27 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -18,25 +18,141 @@
*/
package org.apache.iceberg.flink.source;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
/** Use the IcebergSource (FLIP-27) */
public class TestIcebergSourceSql extends TestSqlBase {
+ private static final Schema SCHEMA_TS =
+ new Schema(
+ required(1, "t1", Types.TimestampType.withoutZone()),
+ required(2, "t2", Types.LongType.get()));
+
@Override
public void before() throws IOException {
- Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+ TableEnvironment tableEnvironment = getTableEnv();
+ Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
+
+
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism",
"1");
SqlHelpers.sql(
- getTableEnv(),
+ tableEnvironment,
"create catalog iceberg_catalog with ('type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='%s')",
catalogResource.warehouse());
- SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
- getTableEnv()
- .getConfig()
- .getConfiguration()
- .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+ SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog");
+
+ tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
true);
+ }
+
+ private Record generateRecord(Instant t1, long t2) {
+ Record record = GenericRecord.create(SCHEMA_TS);
+ record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+ record.setField("t2", t2);
+ return record;
+ }
+
+ /** Generates the records in the expected order, with respect to their
datafile */
+ private List<Record> generateExpectedRecords(boolean ascending) throws
Exception {
+ Table table =
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+ long baseTime = 1702382109000L;
+
+ GenericAppenderHelper helper =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ Record file1Record1 =
+ generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 *
60 * 24 * 30L));
+ Record file1Record2 =
+ generateRecord(
+ Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60
* 60 * 24 * 35L));
+
+ List<Record> recordsDataFile1 = Lists.newArrayList();
+ recordsDataFile1.add(file1Record1);
+ recordsDataFile1.add(file1Record2);
+ DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+
+ Record file2Record1 =
+ generateRecord(
+ Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60
* 60 * 24 * 30L));
+ Record file2Record2 =
+ generateRecord(
+ Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60
* 61 * 24 * 35L));
+
+ List<Record> recordsDataFile2 = Lists.newArrayList();
+ recordsDataFile2.add(file2Record1);
+ recordsDataFile2.add(file2Record2);
+
+ DataFile dataFile2 = helper.writeFile(recordsDataFile2);
+ helper.appendToTable(dataFile1, dataFile2);
+
+ // Expected records if the splits are ordered
+ // - ascending (watermark from t1) - records from the split with early
timestamps, then
+ // records from the split with late timestamps
+ // - descending (watermark from t2) - records from the split with old
longs, then records
+ // from the split with new longs
+ List<Record> expected = Lists.newArrayList();
+ if (ascending) {
+ expected.addAll(recordsDataFile1);
+ expected.addAll(recordsDataFile2);
+ } else {
+ expected.addAll(recordsDataFile2);
+ expected.addAll(recordsDataFile1);
+ }
+ return expected;
+ }
+
+ /** Tests the order of splits returned when setting the watermark-column
options */
+ @Test
+ public void testWatermarkOptionsAscending() throws Exception {
+ List<Record> expected = generateExpectedRecords(true);
+ TestHelpers.assertRecordsWithOrder(
+ run(
+ ImmutableMap.of("watermark-column", "t1", "split-file-open-cost",
"128000000"),
+ "",
+ "*"),
+ expected,
+ SCHEMA_TS);
+ }
+
+ /**
+ * Tests the order of splits returned when setting the watermark-column and
+ * watermark-column-time-unit" options
+ */
+ @Test
+ public void testWatermarkOptionsDescending() throws Exception {
+ List<Record> expected = generateExpectedRecords(false);
+ TestHelpers.assertRecordsWithOrder(
+ run(
+ ImmutableMap.of(
+ "watermark-column",
+ "t2",
+ "watermark-column-time-unit",
+ "MILLISECONDS",
+ "split-file-open-cost",
+ "128000000"),
+ "",
+ "*"),
+ expected,
+ SCHEMA_TS);
}
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index 0e04c9affb..d53ea73f93 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import java.time.Duration;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
@@ -190,4 +191,22 @@ public class FlinkReadConf {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}
+
+ public String watermarkColumn() {
+ return confParser
+ .stringConf()
+ .option(FlinkReadOptions.WATERMARK_COLUMN)
+ .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
+ .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
+ .parseOptional();
+ }
+
+ public TimeUnit watermarkColumnTimeUnit() {
+ return confParser
+ .enumConfParser(TimeUnit.class)
+ .option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
+ .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
+
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
+ .parse();
+ }
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index 55c5aca3b6..1bbd88146c 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
@@ -109,4 +110,14 @@ public class FlinkReadOptions {
public static final String MAX_ALLOWED_PLANNING_FAILURES =
"max-allowed-planning-failures";
public static final ConfigOption<Integer>
MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX +
MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
+
+ public static final String WATERMARK_COLUMN = "watermark-column";
+ public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
+ ConfigOptions.key(PREFIX +
WATERMARK_COLUMN).stringType().noDefaultValue();
+
+ public static final String WATERMARK_COLUMN_TIME_UNIT =
"watermark-column-time-unit";
+ public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION
=
+ ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
+ .enumType(TimeUnit.class)
+ .defaultValue(TimeUnit.MICROSECONDS);
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index a7ce2db61f..0655cf87a9 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -219,8 +220,6 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
- private String watermarkColumn;
- private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -242,9 +241,6 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
- Preconditions.checkArgument(
- watermarkColumn == null,
- "Watermark column and SplitAssigner should not be set in the same
source");
this.splitAssignerFactory = assignerFactory;
return this;
}
@@ -441,7 +437,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
* Emits watermarks once per split based on the min value of column
statistics from files
* metadata in the given split. The generated watermarks are also used for
ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For
long columns consider
- * setting {@link #watermarkTimeUnit(TimeUnit)}.
+ * setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining
small files to a single
* split when the watermark is used for watermark alignment.
@@ -450,7 +446,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same
source");
- this.watermarkColumn = columnName;
+ readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}
@@ -459,8 +455,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
* org.apache.iceberg.types.Types.LongType}, then sets the {@link
TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
- public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
- this.watermarkTimeUnit = timeUnit;
+ public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
+ readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT,
timeUnit.name());
return this;
}
@@ -482,13 +478,16 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
-
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions,
flinkConfig);
+ String watermarkColumn = flinkReadConf.watermarkColumn();
+ TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
+
if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 4357b1f57d..3dce5dd590 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
@@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
+ private final String watermarkColumn;
+ private final TimeUnit watermarkColumnTimeUnit;
private ScanContext(
boolean caseSensitive,
@@ -91,6 +94,8 @@ public class ScanContext implements Serializable {
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
+ String watermarkColumn,
+ TimeUnit watermarkColumnTimeUnit,
String branch,
String tag,
String startTag,
@@ -122,6 +127,8 @@ public class ScanContext implements Serializable {
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
+ this.watermarkColumn = watermarkColumn;
+ this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;
validate();
}
@@ -272,6 +279,14 @@ public class ScanContext implements Serializable {
return maxAllowedPlanningFailures;
}
+ public String watermarkColumn() {
+ return watermarkColumn;
+ }
+
+ public TimeUnit watermarkColumnTimeUnit() {
+ return watermarkColumnTimeUnit;
+ }
+
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long
newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -298,6 +313,8 @@ public class ScanContext implements Serializable {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+ .watermarkColumn(watermarkColumn)
+ .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}
@@ -327,6 +344,8 @@ public class ScanContext implements Serializable {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+ .watermarkColumn(watermarkColumn)
+ .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}
@@ -367,6 +386,9 @@ public class ScanContext implements Serializable {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
+ private String watermarkColumn =
FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
+ private TimeUnit watermarkColumnTimeUnit =
+ FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
private Builder() {}
@@ -500,6 +522,16 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder watermarkColumn(String newWatermarkColumn) {
+ this.watermarkColumn = newWatermarkColumn;
+ return this;
+ }
+
+ public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
+ this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
+ return this;
+ }
+
public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig
readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions,
readableConfig);
@@ -525,7 +557,9 @@ public class ScanContext implements Serializable {
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
- .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+ .watermarkColumn(flinkReadConf.watermarkColumn())
+ .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
}
public ScanContext build() {
@@ -552,6 +586,8 @@ public class ScanContext implements Serializable {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
+ watermarkColumn,
+ watermarkColumnTimeUnit,
branch,
tag,
startTag,
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index d2d0c97f4b..80e5ddd24f 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -68,6 +68,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
+import org.assertj.core.api.Assertions;
public class TestHelpers {
private TestHelpers() {}
@@ -126,7 +127,7 @@ public class TestHelpers {
.collect(Collectors.toList());
}
- public static void assertRecords(List<Row> results, List<Record>
expectedRecords, Schema schema) {
+ private static List<Row> convertRecordToRow(List<Record> expectedRecords,
Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
@@ -135,6 +136,17 @@ public class TestHelpers {
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r ->
expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
+ return expected;
+ }
+
+ public static void assertRecordsWithOrder(
+ List<Row> results, List<Record> expectedRecords, Schema schema) {
+ List<Row> expected = convertRecordToRow(expectedRecords, schema);
+ assertRowsWithOrder(results, expected);
+ }
+
+ public static void assertRecords(List<Row> results, List<Record>
expectedRecords, Schema schema) {
+ List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRows(results, expected);
}
@@ -146,6 +158,10 @@ public class TestHelpers {
assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}
+ public static void assertRowsWithOrder(List<Row> results, List<Row>
expected) {
+ Assertions.assertThat(results).containsExactlyElementsOf(expected);
+ }
+
public static void assertRowData(Schema schema, StructLike expected, RowData
actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema),
expected, actual);
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index e66ae79c28..4250460d27 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -18,25 +18,141 @@
*/
package org.apache.iceberg.flink.source;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
/** Use the IcebergSource (FLIP-27) */
public class TestIcebergSourceSql extends TestSqlBase {
+ private static final Schema SCHEMA_TS =
+ new Schema(
+ required(1, "t1", Types.TimestampType.withoutZone()),
+ required(2, "t2", Types.LongType.get()));
+
@Override
public void before() throws IOException {
- Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+ TableEnvironment tableEnvironment = getTableEnv();
+ Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
+
+
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism",
"1");
SqlHelpers.sql(
- getTableEnv(),
+ tableEnvironment,
"create catalog iceberg_catalog with ('type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='%s')",
catalogResource.warehouse());
- SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
- getTableEnv()
- .getConfig()
- .getConfiguration()
- .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+ SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog");
+
+ tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
true);
+ }
+
+ private Record generateRecord(Instant t1, long t2) {
+ Record record = GenericRecord.create(SCHEMA_TS);
+ record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+ record.setField("t2", t2);
+ return record;
+ }
+
+ /** Generates the records in the expected order, with respect to their
datafile */
+ private List<Record> generateExpectedRecords(boolean ascending) throws
Exception {
+ Table table =
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+ long baseTime = 1702382109000L;
+
+ GenericAppenderHelper helper =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ Record file1Record1 =
+ generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 *
60 * 24 * 30L));
+ Record file1Record2 =
+ generateRecord(
+ Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60
* 60 * 24 * 35L));
+
+ List<Record> recordsDataFile1 = Lists.newArrayList();
+ recordsDataFile1.add(file1Record1);
+ recordsDataFile1.add(file1Record2);
+ DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+
+ Record file2Record1 =
+ generateRecord(
+ Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60
* 60 * 24 * 30L));
+ Record file2Record2 =
+ generateRecord(
+ Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60
* 61 * 24 * 35L));
+
+ List<Record> recordsDataFile2 = Lists.newArrayList();
+ recordsDataFile2.add(file2Record1);
+ recordsDataFile2.add(file2Record2);
+
+ DataFile dataFile2 = helper.writeFile(recordsDataFile2);
+ helper.appendToTable(dataFile1, dataFile2);
+
+ // Expected records if the splits are ordered
+ // - ascending (watermark from t1) - records from the split with early
timestamps, then
+ // records from the split with late timestamps
+ // - descending (watermark from t2) - records from the split with old
longs, then records
+ // from the split with new longs
+ List<Record> expected = Lists.newArrayList();
+ if (ascending) {
+ expected.addAll(recordsDataFile1);
+ expected.addAll(recordsDataFile2);
+ } else {
+ expected.addAll(recordsDataFile2);
+ expected.addAll(recordsDataFile1);
+ }
+ return expected;
+ }
+
+ /** Tests the order of splits returned when setting the watermark-column
options */
+ @Test
+ public void testWatermarkOptionsAscending() throws Exception {
+ List<Record> expected = generateExpectedRecords(true);
+ TestHelpers.assertRecordsWithOrder(
+ run(
+ ImmutableMap.of("watermark-column", "t1", "split-file-open-cost",
"128000000"),
+ "",
+ "*"),
+ expected,
+ SCHEMA_TS);
+ }
+
+ /**
+ * Tests the order of splits returned when setting the watermark-column and
+ * watermark-column-time-unit" options
+ */
+ @Test
+ public void testWatermarkOptionsDescending() throws Exception {
+ List<Record> expected = generateExpectedRecords(false);
+ TestHelpers.assertRecordsWithOrder(
+ run(
+ ImmutableMap.of(
+ "watermark-column",
+ "t2",
+ "watermark-column-time-unit",
+ "MILLISECONDS",
+ "split-file-open-cost",
+ "128000000"),
+ "",
+ "*"),
+ expected,
+ SCHEMA_TS);
}
}