This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ee33fa254 [flink] Support Delta Join on Flink 2.1 (#1726)
ee33fa254 is described below
commit ee33fa254d22ef2a851196975837a1cfaf0f7652
Author: Xuyang <[email protected]>
AuthorDate: Sat Sep 27 17:53:54 2025 +0800
[flink] Support Delta Join on Flink 2.1 (#1726)
---
.../apache/fluss/flink/catalog/Flink21Catalog.java | 106 +++++++++++++
.../fluss/flink/catalog/Flink21CatalogFactory.java | 34 ++++
.../org.apache.flink.table.factories.Factory | 2 +-
.../fluss/flink/catalog/Flink21CatalogITCase.java | 174 ++++++++++++++++++++-
.../flink/source/Flink21TableSourceITCase.java | 128 ++++++++++++++-
.../apache/fluss/flink/catalog/FlinkCatalog.java | 2 +-
.../fluss/flink/catalog/FlinkCatalogITCase.java | 17 +-
7 files changed, 457 insertions(+), 6 deletions(-)
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
new file mode 100644
index 000000000..4f567aae0
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+import org.apache.fluss.metadata.TableInfo;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link FlinkCatalog} used for Flink 2.1. */
+public class Flink21Catalog extends FlinkCatalog {
+
+ public Flink21Catalog(
+ String name,
+ String defaultDatabase,
+ String bootstrapServers,
+ ClassLoader classLoader,
+ Map<String, String> securityConfigs) {
+ super(name, defaultDatabase, bootstrapServers, classLoader,
securityConfigs);
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath objectPath)
+ throws TableNotExistException, CatalogException {
+ CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
+ if (!(catalogBaseTable instanceof CatalogTable)) {
+ return catalogBaseTable;
+ }
+
+ CatalogTable table = (CatalogTable) catalogBaseTable;
+ Optional<Schema.UnresolvedPrimaryKey> pkOp =
table.getUnresolvedSchema().getPrimaryKey();
+ // If there is no pk, return directly.
+ if (pkOp.isEmpty()) {
+ return table;
+ }
+
+ Schema.Builder newSchemaBuilder =
+ Schema.newBuilder().fromSchema(table.getUnresolvedSchema());
+ // Pk is always an index.
+ newSchemaBuilder.index(pkOp.get().getColumnNames());
+
+ // Judge whether we can do prefix lookup.
+ TableInfo tableInfo =
connection.getTable(toTablePath(objectPath)).getTableInfo();
+ List<String> bucketKeys = tableInfo.getBucketKeys();
+ // For partition table, the physical primary key is the primary key
that excludes the
+ // partition key
+ List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
+ List<String> indexKeys = new ArrayList<>();
+ if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
+ indexKeys.addAll(bucketKeys);
+ if (tableInfo.isPartitioned()) {
+ indexKeys.addAll(tableInfo.getPartitionKeys());
+ }
+ }
+
+ if (!indexKeys.isEmpty()) {
+ newSchemaBuilder.index(indexKeys);
+ }
+ return CatalogTable.newBuilder()
+ .schema(newSchemaBuilder.build())
+ .comment(table.getComment())
+ .partitionKeys(table.getPartitionKeys())
+ .options(table.getOptions())
+ .snapshot(table.getSnapshot().orElse(null))
+ .distribution(table.getDistribution().orElse(null))
+ .build();
+ }
+
+ private static boolean isPrefixList(List<String> fullList, List<String>
prefixList) {
+ if (fullList.size() <= prefixList.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < prefixList.size(); i++) {
+ if (!fullList.get(i).equals(prefixList.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
new file mode 100644
index 000000000..8557a552f
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+/** A {@link FlinkCatalogFactory} used for Flink 2.1. */
+public class Flink21CatalogFactory extends FlinkCatalogFactory {
+
+ @Override
+ public FlinkCatalog createCatalog(Context context) {
+ FlinkCatalog catalog = super.createCatalog(context);
+ return new Flink21Catalog(
+ catalog.catalogName,
+ catalog.defaultDatabase,
+ catalog.bootstrapServers,
+ catalog.classLoader,
+ catalog.securityConfigs);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 6544cb534..f13f71331 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,4 +16,4 @@
# limitations under the License.
#
-org.apache.fluss.flink.catalog.FlinkCatalogFactory
\ No newline at end of file
+org.apache.fluss.flink.catalog.Flink21CatalogFactory
\ No newline at end of file
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
index b12aaa5e8..c0b9b9196 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
@@ -17,5 +17,177 @@
package org.apache.fluss.flink.catalog;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
/** IT case for catalog in Flink 2.1. */
-public class Flink21CatalogITCase extends FlinkCatalogITCase {}
+public class Flink21CatalogITCase extends FlinkCatalogITCase {
+
+ @BeforeAll
+ static void beforeAll() {
+ FlinkCatalogITCase.beforeAll();
+
+ // close the old one and open a new one later
+ catalog.close();
+
+ catalog =
+ new Flink21Catalog(
+ catalog.catalogName,
+ catalog.defaultDatabase,
+ catalog.bootstrapServers,
+ catalog.classLoader,
+ catalog.securityConfigs);
+ catalog.open();
+ }
+
+ @Test
+ void testGetTableWithIndex() throws Exception {
+ String tableName = "table_with_pk_only";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " primary key (a, b) NOT ENFORCED"
+ + ") with ( "
+ + " 'connector' = 'fluss' "
+ + ")",
+ tableName));
+ CatalogTable table = (CatalogTable) catalog.getTable(new
ObjectPath(DEFAULT_DB, tableName));
+ Schema expectedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .primaryKey("a", "b")
+ .index("a", "b")
+ .build();
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_prefix_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " primary key (a, b) NOT ENFORCED"
+ + ") with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'a'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .primaryKey("a", "b")
+ .index("a", "b")
+ .index("a")
+ .build();
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_bucket_key_is_not_prefix_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " primary key (a, b) NOT ENFORCED"
+ + ") with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'b'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .primaryKey("a", "b")
+ .index("a", "b")
+ .build();
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_partition_1";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " dt varchar, "
+ + " primary key (a, b, dt) NOT ENFORCED "
+ + ") "
+ + " partitioned by (dt) "
+ + " with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'a'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .column("dt", DataTypes.STRING().notNull())
+ .primaryKey("a", "b", "dt")
+ .index("a", "b", "dt")
+ .index("a", "dt")
+ .build();
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_partition_2";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " dt varchar, "
+ + " primary key (dt, a, b) NOT ENFORCED "
+ + ") "
+ + " partitioned by (dt) "
+ + " with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'a'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .column("dt", DataTypes.STRING().notNull())
+ .primaryKey("dt", "a", "b")
+ .index("dt", "a", "b")
+ .index("a", "dt")
+ .build();
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+ }
+
+ @Override
+ protected void addDefaultIndexKey(Schema.Builder schemaBuilder) {
+ super.addDefaultIndexKey(schemaBuilder);
+
+ Schema currentSchema = schemaBuilder.build();
+ currentSchema.getPrimaryKey().ifPresent(pk ->
schemaBuilder.index(pk.getColumnNames()));
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
index b337cab46..9b1e908da 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
@@ -17,5 +17,131 @@
package org.apache.fluss.flink.source;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** IT case for {@link FlinkTableSource} in Flink 2.1. */
-public class Flink21TableSourceITCase extends FlinkTableSourceITCase {}
+public class Flink21TableSourceITCase extends FlinkTableSourceITCase {
+
+ @Test
+ void testDeltaJoin() throws Exception {
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L));
+ // write records
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L));
+ // write records
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 13bd4b019..97933b035 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -114,7 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
protected final String catalogName;
protected final String defaultDatabase;
protected final String bootstrapServers;
- private final Map<String, String> securityConfigs;
+ protected final Map<String, String> securityConfigs;
protected Connection connection;
protected Admin admin;
private volatile @Nullable LakeCatalog lakeCatalog;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 300e639e4..062c6eea7 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -86,7 +86,7 @@ abstract class FlinkCatalogITCase {
static final String CATALOG_NAME = "testcatalog";
static final String DEFAULT_DB =
FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue();
- static Catalog catalog;
+ static FlinkCatalog catalog;
protected TableEnvironment tEnv;
@@ -181,6 +181,7 @@ abstract class FlinkCatalogITCase {
.column("r", DataTypes.TIMESTAMP_LTZ())
.column("s", DataTypes.ROW(DataTypes.FIELD("a",
DataTypes.INT())))
.primaryKey("a");
+ addDefaultIndexKey(schemaBuilder);
Schema expectedSchema = schemaBuilder.build();
CatalogTable table =
(CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
"test_table"));
@@ -291,6 +292,7 @@ abstract class FlinkCatalogITCase {
tEnv.executeSql("create table append_only_table(a int, b int) with
('bucket.num' = '10')");
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("a", DataTypes.INT()).column("b",
DataTypes.INT());
+ addDefaultIndexKey(schemaBuilder);
Schema expectedSchema = schemaBuilder.build();
CatalogTable table =
(CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
"append_only_table"));
@@ -313,6 +315,7 @@ abstract class FlinkCatalogITCase {
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("dt", DataTypes.STRING());
+ addDefaultIndexKey(schemaBuilder);
Schema expectedSchema = schemaBuilder.build();
CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
@@ -366,6 +369,7 @@ abstract class FlinkCatalogITCase {
+ " 'table.auto-partition.time-unit' = 'year')");
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("a", DataTypes.INT()).column("b",
DataTypes.STRING());
+ addDefaultIndexKey(schemaBuilder);
Schema expectedSchema = schemaBuilder.build();
CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
@@ -475,6 +479,7 @@ abstract class FlinkCatalogITCase {
.column("b", DataTypes.STRING())
.column("c", DataTypes.STRING())
.column("hh", DataTypes.STRING());
+ addDefaultIndexKey(schemaBuilder);
Schema expectedSchema = schemaBuilder.build();
CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
@@ -559,6 +564,7 @@ abstract class FlinkCatalogITCase {
.column("order_time", DataTypes.TIMESTAMP(3))
.watermark("order_time", "`order_time` - INTERVAL '5' SECOND")
.primaryKey("user");
+ addDefaultIndexKey(schemaBuilder);
Schema expectedSchema = schemaBuilder.build();
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
Map<String, String> expectedOptions = new HashMap<>();
@@ -775,7 +781,14 @@ abstract class FlinkCatalogITCase {
"The configured default-database 'non-exist' does not
exist in the Fluss cluster.");
}
- private static void assertOptionsEqual(
+ /**
+ * Before Flink 2.1, the {@link Schema} did not include an index field.
Starting from Flink 2.1,
+ * Flink introduced the concept of an index, and in Fluss, the primary key
is considered as an
+ * index.
+ */
+ protected void addDefaultIndexKey(Schema.Builder schemaBuilder) {}
+
+ protected static void assertOptionsEqual(
Map<String, String> actualOptions, Map<String, String>
expectedOptions) {
actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());
actualOptions.remove(ConfigOptions.TABLE_REPLICATION_FACTOR.key());